为了账号安全,请及时绑定邮箱和手机立即绑定

是否可以明确要求 JetStream发送它在主题 foo.* 中收到的最后几条消息?

是否可以明确要求 JetStream发送它在主题 foo.* 中收到的最后几条消息?

Go
largeQ 2022-10-24 16:57:28
基本上是主题所说的。我想知道 JetStream 是否可以通过允许我们重新获取主题“foo.*”的最后 15 条消息或 JetStream 在过去 1.5 秒内收到的关于主题“foo.*”的消息的方式进行查询。如果可能的话,任何代码示例或代码示例的链接都将受到赞赏。
查看完整描述

2 回答

?
qq_遁去的一_1

TA贡献1725条经验 获得超7个赞

根据官方文档

  • 可以从某个时间开始抓取消息:在最后 1.5 秒内。

DeliverByStartTime
第一次使用消息时,从该时间或之后的消息开始。消费者需要指定 OptStartTime,即流中开始的时间。它将在该时间或之后收到最接近的可用消息。

  • 另一个要求,最后15条消息,我认为不可能


查看完整回答
反对 回复 2022-10-24
?
森栏

TA贡献1810条经验 获得超5个赞

在 JetStream 中有一种方法可以实现与时间相关的检索。


now := time.Now()

oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)


js, _ := nc.JetStream()

sub, err := js.SubscribeSync(

     "foo.*",

     nats.OrderedConsumer(),

     nats.StartTime(oneAndHalfSecondAgo),

)


for {

    msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones

    if err != nil {

        log.Fatal(err)

    }


    // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here


    // 2. if the message is before now we can push it in an array here

}

请注意,这种技术虽然有用,但效率非常低,因为我们一个接一个地抓取消息。


我们可以使用 .Subscribe() (这是异步的)来修改它,但是我们会遇到一个不同的问题:


我们将在当前时刻从 JetStream 过度拉动,然后我们必须确保我们抓取的缓冲消息确实会返回给 JetStream。据我所知,没有配置选项可以告诉 JetStream 有关“MaxTime”的信息。


至于如何“获取最新的 N 消息”,可以修改上面的代码示例,以便在获得所有消息后,他将获得相当高的消息块(pe 所有消息在最后 5 秒或 10 秒或 30 秒内)到现在为止,他可以抓取最新的“N”条消息。


这种技术当然并不理想,但似乎没有其他方法可以做到这一点——至少在撰写本文时还没有。


查看完整回答
反对 回复 2022-10-24
  • 2 回答
  • 0 关注
  • 88 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信