为了账号安全,请及时绑定邮箱和手机立即绑定

在 Kafka Stream 中处理消息时发生错误时重新处理消息

在 Kafka Stream 中处理消息时发生错误时重新处理消息

holdtom 2022-12-21 13:06:21
我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行map转换并打印此消息。KStream像这样配置@Beanpublic KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,         PrintAction printAction, String topicName) {    KStream<String, JsonNode> source = builder.stream(topicName,                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));    // @formatter:off    source        .map(myTransformer)        .foreach(printAction);    // @formatter:on    return source;}在内部MyTransformer,我正在调用此时可能会关闭的外部微服务。如果调用失败(通常是抛出错误RuntimeException),我就无法进行转换。这里的问题是,如果在之前的处理过程中发生任何错误,有什么方法可以再次重新处理 Streams 应用程序中的消息?根据我目前的研究,这里没有办法这样做,我唯一的可能是将消息推送到死信主题中,并在将来再次失败时尝试处理它我再次将其推送到 DLT 中并以这种方式重试.
查看完整描述

1 回答

?
米琪卡哇伊

TA贡献1998条经验 获得超6个赞

如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。您需要自己捕获异常。重试可以通过以下两种方式实现:1)使用 SpringRetryTemplate调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2)将失败的消息推送到另一个主题以供以后重新处理(如您所建议的)


更新时间kafka-streams 2.8.0


因为,您可以使用方法自动替换kafka-streams 2.8.0失败的流线程(由未捕获的异常引起)。有关详细信息,请查看Kafka Streams 特定的未捕获异常处理程序KafkaStreamsvoid setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD


kafkaStreams.setUncaughtExceptionHandler(ex -> {

    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);

    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;

});


查看完整回答
反对 回复 2022-12-21
  • 1 回答
  • 0 关注
  • 161 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号