为了账号安全,请及时绑定邮箱和手机立即绑定

如何在反序列化过程中不使用无限循环的情况下编写kafka消费者?

如何在反序列化过程中不使用无限循环的情况下编写kafka消费者?

慕桂英4014372 2023-07-28 09:49:58
如何在java中编写kafka消费者而不使用无限循环进行轮询?在处理传入记录函数中编写了 while(true) 循环,其中轮询新事件。如果我在我的项目中使用它,除了这个我无法做任何其他事情。有没有办法避免使用这种无限循环来获取新事件? public static void main(String[] str) throws InterruptedException {    System.out.println("Starting  AtMostOnceConsumer ...");    execute();}private static void execute() throws InterruptedException {    KafkaConsumer<String, Event> consumer = createConsumer();    // Subscribe to all partition in that topic. 'assign' could be used here    // instead of 'subscribe' to subscribe to specific partition.    consumer.subscribe(Arrays.asList("topic"));    processRecords(consumer);}private static KafkaConsumer<String, Event> createConsumer() {    Properties props = new Properties();    String consumeGroup = "group_id";    props.put("group.id", consumeGroup);    props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");    props.put("client.id", "clientId");    props.put("security.protocol", "SASL_SSL");    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");    props.put("enable.auto.commit", "true");    // Auto commit interval, kafka would commit offset at this interval.    props.put("auto.commit.interval.ms", "101");    // This is how to control number of records being read in each poll    props.put("max.partition.fetch.bytes", "135");    // Set this if you want to always read from beginning.    // props.put("auto.offset.reset", "earliest");    props.put("heartbeat.interval.ms", "3000");    props.put("session.timeout.ms", "6001");}有人可以帮我修改这个,以便我可以避免while(true)循环并且可以只监听我传入的事件吗?
查看完整描述

3 回答

?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

你可以尝试这样的事情:


public class ConsumerDemoWithThread {

private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());

private String bootstrapServers = "127.0.0.1:9092";

private String groupId = "my-first-application";

private String topic = "first-topic";


KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);


private void pollForRecords() {

    ExecutorService executor = Executors.newSingleThreadExecutor();

    executor.submit(() -> processRecords());

}



private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {

    Properties properties = new Properties();

    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // create consumer

    KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);

    // subscribe consumer to our topic(s)

    consumer.subscribe(Arrays.asList(topic));

    return consumer;

}



private void processRecords() {

    try {

        while (true) {

            ConsumerRecords<String, String> records =

                    consumer.poll(Duration.ofMillis(100));


            for (ConsumerRecord<String, String> record : records) {

                logger.info("Key: " + record.key() + ", Value: " + record.value());

                logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());

            }

        }

    } catch (WakeupException e) {

        logger.info("Received shutdown signal!");

    } finally {

        consumer.close();

    }

}


public static void main(String[] args) {

    ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();

    consumerDemoWithThread.pollForRecords();

}

}

查看完整回答
反对 回复 2023-07-28
?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

您可以使用@KafkaListener,然而,它也会以无限循环进行轮询,因为这就是 Kafka 的设计方式——它不是一个队列,而是一个存储一段时间记录的事件总线。没有通知其消费者的机制。

轮询不同的线程并以优雅的方式退出循环。


查看完整回答
反对 回复 2023-07-28
?
慕村225694

TA贡献1880条经验 获得超4个赞

如果您希望能够在代码中同时执行多项操作,则需要后台线程。

为了更轻松地做到这一点,您可以使用更高级别的 Kafka 库,例如 Spring(已回答)、Vert.x或Smallrye

这是一个 Vert.x 示例,首先创建一个KafkaConsumer,然后分配处理程序并订阅您的主题

consumer.handler(record -> {

  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +

    ",partition=" + record.partition() + ",offset=" + record.offset());

});


// subscribe to a single topic

consumer.subscribe("a-single-topic");


查看完整回答
反对 回复 2023-07-28
  • 3 回答
  • 0 关注
  • 86 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信