为了账号安全,请及时绑定邮箱和手机立即绑定

当我使用 akka 流在现有消费者组中创建新消费者时,如何寻求结束 kafka 主题?

当我使用 akka 流在现有消费者组中创建新消费者时,如何寻求结束 kafka 主题?

浮云间 2023-02-16 16:10:18
我有一个 akka 应用程序(在 JAVA 中),用于commitablePartitionedSource使用来自 kafka 主题的消息。我有几个消费者群体,可以为多个主题吸引消费者。这是由动态配置驱动的,我可以在其中暂时关闭消费者,并可能在稍后重新启动它们。当这个消费者重新启动时,我只想阅读新消息,而不是从我离开的地方开始。有没有办法从 akka-alpakka 消费者那里获取 kafkaConsumer 对象,以便我可以在处理之前使用 seekToEnd()?请让我知道是否还有其他方法可以实现这一目标?也许使用 akka 配置或不同类型的消费者?我不想维护自己的偏移量(希望不是唯一的选择)我的配置设置为latest在我启动消费者组时获取偏移量,但由于我正在关闭并重新启动单个消费者,它总是从我停止的地方开始消费。我尝试为一个主题创建一个消费者组,但我有很多主题,而且结果非常耗费资源。我还寻找一种方法来清除存储在 kafka 中的该主题的偏移量,但没有成功。
查看完整描述

2 回答

?
叮当猫咪

TA贡献1776条经验 获得超12个赞

最简单的方法就是在每次启动重新启动消费者时创建一个新的消费者组。Kafka 将在可配置的时间量 ( retention.ms ) 后负责删除陈旧的消费者组

如果您很少重启消费者并且总是希望它处理新数据而不是赶上所有丢失的消息,则此策略很好。

编辑

据我所知,访问底层 KafkaConsumer 的唯一方法是使用committableExternalSource. 这样您就可以访问该seekToEnd方法,但是您还需要注意订阅提供每个分区的起始偏移量的主题(类似于您现在在AkkacommittablePartitionedSource之外设置的方式)。


查看完整回答
反对 回复 2023-02-16
?
明月笑刀无情

TA贡献1828条经验 获得超4个赞

commitablePartitionedSource将AutoSubscription其作为输入,您不能指定偏移量。


您需要的是一种采用ManualSubscription或更高级别的方法Subscription,例如


> plainExternalSource

> committableExternalSource

> plainSource

...


查看完整回答
反对 回复 2023-02-16
  • 2 回答
  • 0 关注
  • 79 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信