4 回答
TA贡献1777条经验 获得超10个赞
在 oder 中包含了 confumer 属性的声明,以在 consume 方法中容纳“Acknowledgment”对象。我也关注了这个链接(https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main /java/org/s1p/CommonConfiguration.java)获取我用来声明和设置所有属性(consumerProperties、consumerFactory、kafkaListenerContainerFactory)的所有方法。我发现的唯一问题是“new SeekToCurrentErrorHandler()”声明,因为我遇到了一个错误,目前我无法解决它(如果有人向我解释它会很棒)。
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
``````````````````````````````````````````````````````````
TA贡献1820条经验 获得超10个赞
enable.auto.commit您必须将属性设置为 false来修改您的消费者配置:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
然后,您必须修改 Spring Kafka Listener 工厂并将 ack-mode 设置为MANUAL_IMMEDIATE. 这是一个示例ConcurrentKafkaListenerContainerFactory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
如文档中所述,MANUAL_IMMEDIATE意味着:当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。
您可以在此处找到所有提交方法。
然后,在您的侦听器代码中,您可以通过添加一个对象来手动提交偏移量Acknowledgment,例如:
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
// commit immediately
acknowledgment.acknowledge();
}
TA贡献1798条经验 获得超7个赞
您可以java.util.concurrent.BlockingQueue在使用和提交 Kafka 偏移量时使用 a 来推送消息。然后使用另一个线程从 blockingQueue 中获取消息并进行处理。这样您就不必等到处理完成。
TA贡献1911条经验 获得超7个赞
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
设置完上面的属性后,如果你想批量处理,那么按照下面的配置即可。
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// 您可以设置 Manual 或 MANUAL_IMMEDIATE 因为 //KafkaMessageListenerContainer 调用 //ConsumerBatchAcknowledgment 用于任何类型的手动确认模式
factory.getContainerProperties().setAckOnError(true);
//specifying batch error handler because i have enabled to listen records in batch
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setBatchListener(true);
factory.getContainerProperties().setSyncCommits(false);
添加回答
举报
