为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka consumer.poll 不返回任何记录

Kafka consumer.poll 不返回任何记录

蝴蝶刀刀 2022-11-02 16:52:04
当我使用新的组 ID 注册消费者时,前 N 次轮询调用不返回任何内容。我想测试当我调用服务时,会发布一个 Kafka 事件。问题是每当我更改 groupId 时,前 N 个民意调查都不会返回任何内容。我了解 Kafka 在轮询时首先注册消费者,但我发现注册消费者所需的轮询次数(时间)过于随机。消费者配置:Properties props = new Properties();props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC_NAME));脚步:在每次测试之前,我consumer.poll(Duration.ofSeconds(5))只是为了确保消费者已注册并设置了偏移量。我调用服务并断言响应。如果我使用 UI 检查 Kafka,则会发布事件。我打电话consumer.poll(Duration.ofSeconds(5)),希望能收到一些记录。这是失败的一步。有没有办法确保第二次投票总是返回记录?我试图让第一次投票持续 1 分钟(我已经认为 5 秒对于等待每次测试来说太长了),它有时仍然有效,有时无效。谢谢。
查看完整描述

1 回答

?
一只萌萌小番薯

TA贡献1795条经验 获得超7个赞

它不适用于您的“新 groupId”的原因是您处于“最新”模式。

默认值为“最新”,您需要处于“最早”模式或使用您的“新 groupId”首次轮询或为此主题的此“新 groupId”提交偏移量。

您需要将“groupId”注册到主题,而不是消费者。


查看完整回答
反对 回复 2022-11-02
  • 1 回答
  • 0 关注
  • 421 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号