为了账号安全,请及时绑定邮箱和手机立即绑定

在阅读主题后异步提交消息

在阅读主题后异步提交消息

慕哥6287543 2022-12-21 13:02:12
我试图在从主题中阅读消息后立即提交消息。我已点击此链接 ( https://www.confluent.io/blog/apache-kafka-spring-boot-application ) 使用 spring 创建一个 Kafka 消费者。通常它工作完美,消费者收到消息并等待另一个人进入队列。但问题是,当我处理这条消息时,它会花费很多时间(大约 10 分钟)kafka 队列认为消息没有被消费(提交)并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续时间更长时,它不会提交消息。主要课程在这里:@SpringBootApplication@EnableAsyncpublic class SpringBootKafkaApp {    public static void main(String[] args) {        SpringApplication.run(SpringBootKafkaApp .class, args);    }消费者类(我需要提交消息的地方)@Servicepublic class Consumer {@Autowired    AppPropert prop;   Consumer cons;@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")    public void consume(String message) throws IOException {        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */        Properties  props=prope.startProp();//just getting my properties from my config-file        ControllerPRO pro = new ControllerPRO();        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me        try {            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method             /*This works fine when the processLaunch method takes less than 5 minutes,             if it takes longer the consumer will get the same message from the topic and start again with this operation             */        } catch (Exception e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.println("End of consumer method ");    }    }如何在从队列中读取消息后立即提交消息。我想确保当我收到消息时我会立即提交消息。现在,当我在 (System.out.println) 之后完成执行该方法时,消息已提交。那么有人可以告诉我该怎么做吗?
查看完整描述

4 回答

?
不负相思意

TA贡献1777条经验 获得超10个赞

在 oder 中包含了 confumer 属性的声明,以在 consume 方法中容纳“Acknowledgment”对象。我也关注了这个链接(https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main /java/org/s1p/CommonConfiguration.java)获取我用来声明和设置所有属性(consumerProperties、consumerFactory、kafkaListenerContainerFactory)的所有方法。我发现的唯一问题是“new SeekToCurrentErrorHandler()”声明,因为我遇到了一个错误,目前我无法解决它(如果有人向我解释它会很棒)。



@Service

public class Consumer {


@Autowired

    AppPropert prop;


   Consumer cons;



   @Bean

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();


        factory.setConsumerFactory(consumerFactory());

        factory.getContainerProperties().setAckOnError(false);

        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library

        return factory;

    }


    @Bean

    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerProperties());

    }


     @Bean

    public Map<String, Object> consumerProperties() {

        Map<String, Object> props = new HashMap<>();

        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)

        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));

        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");

        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));

        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 

        return props;

    }





    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")

    public void consume(String message) throws IOException {

        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        acknowledgment.acknowledge();// commit immediately

        Properties  props=prop.startProp();//just getting my properties from my config-file

        ControllerPRO pro = new ControllerPRO();


        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me

        try {


            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 

            /*This works fine when the processLaunch method takes less than 5 minutes, 

            if it takes longer the consumer will get the same message from the topic and start again with this operation 

            */


        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

        System.out.println("End of consumer method ");


    }


    }


``````````````````````````````````````````````````````````


查看完整回答
反对 回复 2022-12-21
?
拉莫斯之舞

TA贡献1820条经验 获得超10个赞

enable.auto.commit您必须将属性设置为 false来修改您的消费者配置:


properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后,您必须修改 Spring Kafka Listener 工厂并将 ack-mode 设置为MANUAL_IMMEDIATE. 这是一个示例ConcurrentKafkaListenerContainerFactory:


@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

    factory.setConsumerFactory(consumerFactory());

    factory.getContainerProperties().setAckOnError(false);

    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);

    factory.setErrorHandler(new SeekToCurrentErrorHandler());

    return factory;

}

如文档中所述,MANUAL_IMMEDIATE意味着:当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。


您可以在此处找到所有提交方法。


然后,在您的侦听器代码中,您可以通过添加一个对象来手动提交偏移量Acknowledgment,例如:


@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")

public void consume(String message, Acknowledgment acknowledgment) {

   // commit immediately

    acknowledgment.acknowledge();

}


查看完整回答
反对 回复 2022-12-21
?
元芳怎么了

TA贡献1798条经验 获得超7个赞

您可以java.util.concurrent.BlockingQueue在使用和提交 Kafka 偏移量时使用 a 来推送消息。然后使用另一个线程从 blockingQueue 中获取消息并进行处理。这样您就不必等到处理完成。



查看完整回答
反对 回复 2022-12-21
?
Smart猫小萌

TA贡献1911条经验 获得超7个赞

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


设置完上面的属性后,如果你想批量处理,那么按照下面的配置即可。


     factory.getContainerProperties().setAckMode(AckMode.MANUAL);

// 您可以设置 Manual 或 MANUAL_IMMEDIATE 因为 //KafkaMessageListenerContainer 调用 //ConsumerBatchAcknowledgment 用于任何类型的手动确认模式


    factory.getContainerProperties().setAckOnError(true);

    //specifying batch error handler because i have enabled to listen records in batch

    factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());

    factory.setBatchListener(true);

    factory.getContainerProperties().setSyncCommits(false);


查看完整回答
反对 回复 2022-12-21
  • 4 回答
  • 0 关注
  • 173 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号