为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka AvroConsumer 使用 offsets_for_times 从时间戳开始消费

Kafka AvroConsumer 使用 offsets_for_times 从时间戳开始消费

胡说叔叔 2021-09-02 14:48:45
尝试使用 confluent_kafka.AvroConsumer 来消费来自给定时间戳的消息。if flag:    # creating a list    topic_partitons_to_search = list(        map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))    print("Searching for offsets with %s" % topic_partitons_to_search)    offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)    print("offsets_for_times results: %s" % offsets)    for x in offsets:        c.seek(x)    flag=False 控制台返回这个Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]{'name': 'Hello'}{'name': 'Hello'}{'name': 'Hello1'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Hello3'}{'name': 'Offset 8'}{'name': 'Offset 9'}{'name': 'Offset 10'}{'name': 'Offset 11'}{'name': 'New'} 这些是我在 my_topic2 的分区 0 中的所有消息(分区 1 中没有任何消息),我们应该什么也得不到,因为我们没有从当前时间(time.time())生成的消息。然后我希望能够使用类似的东西time.time() - 60000来获取过去 60000 毫秒内的所有消息
查看完整描述

1 回答

?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

Pythons time.time() 返回自纪元以来的秒数,offsets_for_times 使用纪元的毫秒数,所以当我发送秒数时,它计算的日期比今天早得多,这意味着我们应该包括所有我的抵消。


查看完整回答
反对 回复 2021-09-02
  • 1 回答
  • 0 关注
  • 532 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信