如果在阶段期间发生任何故障,我正在尝试RestartFlow在 Akka Streams javadsl 中使用来重新启动我的流程阶段,但它似乎并没有重新启动流程,而只是丢弃了消息。我已经看到了:Akka Streams 中的 RestartFlow not working as expected,但我使用的是 2.5.19 版本,所以应该修复它?我都试过了RestartFlow.onFailuresWithBackoff,RestartFlow.withBackoff但都没有奏效。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截了异常,因此它不会从流程中抛出,而且似乎没有提供我想要的退避和最大重试策略。流:public Consumer.DrainingControl<Done> stream() { return Consumer.committableSource(consumerSettings, Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX + ConfigKeys.CONSUMER_TOPIC))) .via(RestartFlow.onFailuresWithBackoff( Duration.ofSeconds(1), // min backoff Duration.ofSeconds(2), // max backoff, 0.2, // adds 20% "noise" to vary the intervals slightly 10, // limits the amount of restarts to 10 this::dispatchMessageFlow)) .via(Committer.flow(CommitterSettings.create(system))) .toMat(Sink.ignore(), Keep.both()) .mapMaterializedValue(Consumer::createDrainingControl) .run(mat);}我看到过一次异常,akka 表示它将由于失败而重新启动图表,但在那之后就没有别的了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。如果有人可以帮助我指出正确的方向,我将不胜感激。
1 回答

萧十郎
TA贡献1815条经验 获得超13个赞
它的工作方式有点不同。长话短说 - 如果发生错误,消息将被丢弃,但源/流将重新启动,而不会杀死整个流。它在RestartFlow.onFailuresWithBackoff 文档中有所描述:
重新启动过程本质上是有损的,因为取消和发送消息之间没有协调。来自包装流任一端的终止信号将导致另一端终止,并且任何传输中的消息都将丢失。在退避期间,此 Flow 将背压。
添加回答
举报
0/150
提交
取消