1 回答
TA贡献1998条经验 获得超6个赞
如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。您需要自己捕获异常。重试可以通过以下两种方式实现:1)使用 SpringRetryTemplate调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2)将失败的消息推送到另一个主题以供以后重新处理(如您所建议的)
更新时间kafka-streams 2.8.0
因为,您可以使用方法自动替换kafka-streams 2.8.0失败的流线程(由未捕获的异常引起)。有关详细信息,请查看Kafka Streams 特定的未捕获异常处理程序KafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
添加回答
举报
