为了账号安全,请及时绑定邮箱和手机立即绑定

Akka Streams onFailuresWithBackoff 未重新启动流程

Akka Streams onFailuresWithBackoff 未重新启动流程

眼眸繁星 2022-05-12 16:19:34
如果在阶段期间发生任何故障,我正在尝试RestartFlow在 Akka Streams javadsl 中使用来重新启动我的流程阶段,但它似乎并没有重新启动流程,而只是丢弃了消息。我已经看到了:Akka Streams 中的 RestartFlow not working as expected,但我使用的是 2.5.19 版本,所以应该修复它?我都试过了RestartFlow.onFailuresWithBackoff,RestartFlow.withBackoff但都没有奏效。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截了异常,因此它不会从流程中抛出,而且似乎没有提供我想要的退避和最大重试策略。流:public Consumer.DrainingControl<Done> stream() {    return Consumer.committableSource(consumerSettings,        Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +            ConfigKeys.CONSUMER_TOPIC)))        .via(RestartFlow.onFailuresWithBackoff(                Duration.ofSeconds(1), // min backoff                Duration.ofSeconds(2), // max backoff,                0.2, // adds 20% "noise" to vary the intervals slightly                10, // limits the amount of restarts to 10                this::dispatchMessageFlow))        .via(Committer.flow(CommitterSettings.create(system)))        .toMat(Sink.ignore(), Keep.both())        .mapMaterializedValue(Consumer::createDrainingControl)        .run(mat);}我看到过一次异常,akka 表示它将由于失败而重新启动图表,但在那之后就没有别的了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。如果有人可以帮助我指出正确的方向,我将不胜感激。
查看完整描述

1 回答

?
萧十郎

TA贡献1815条经验 获得超13个赞

它的工作方式有点不同。长话短说 - 如果发生错误,消息将被丢弃,但源/流将重新启动,而不会杀死整个流。它在RestartFlow.onFailuresWithBackoff 文档中有所描述:

重新启动过程本质上是有损的,因为取消和发送消息之间没有协调。来自包装流任一端的终止信号将导致另一端终止,并且任何传输中的消息都将丢失。在退避期间,此 Flow 将背压。


查看完整回答
反对 回复 2022-05-12
  • 1 回答
  • 0 关注
  • 144 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号