下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:通道 1 和通道 2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。Goroutine-1 从 kafka 主题读取消息,在验证消息后将其消息负载抛出到通道-1 上。Goroutine-2 从通道 1 读取并处理有效负载,并将处理后的有效负载抛出通道 2。Goroutine-3 从通道 2 读取数据,并将处理后的有效负载封装到 http 数据包中,并向另一个服务执行 http 请求(使用 http 客户端)。上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务未准备好接受来自Go-routine3的http请求(http客户端超时),处理失败,因此,上述服务会丢失该消息(已从Kafka主题中读取)。戈鲁廷-1目前订阅来自卡夫卡的消息,而没有向卡夫卡发送确认(通知戈鲁廷-3已成功处理特定消息)正确性优先于性能。如何确保每条消息都得到成功处理?
2 回答
白猪掌柜的
TA贡献1893条经验 获得超10个赞
例如,通过新的通道-3将戈鲁廷-3的反馈添加到戈鲁廷-1。戈鲁廷-1将阻塞,直到它得到频道-3的确认。
// in gorouting 1
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // or smth else to prevent deadlock
}
...
// in gorouting 3
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3<-struct{}{}
撒科打诨
TA贡献1934条经验 获得超2个赞
为了确保正确性,您需要在处理成功完成后提交(=确认)消息。
对于处理未成功完成的情况 - 通常,您需要自己实现重试机制。
这应该特定于您的用例,但通常您将消息抛回专用的Kafka重试主题(您创建),添加睡眠并再次处理消息。如果在 x 次后处理失败 - 则将消息抛出到 DLQ(=死信队列)。
你可以在这里阅读更多:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
- 2 回答
- 0 关注
- 156 浏览
添加回答
举报
0/150
提交
取消
