3 回答

TA贡献1877条经验 获得超6个赞
这是尝试取消确认模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它应该能够处理您的问题。您应该容忍通过队列重复提交数据的机会。这是一个例子:
定义抽象操作并为操作分配 ID 和时间戳。
将状态Pending写入数据库(可以和1一样的步骤)
编写一个侦听器,轮询数据库中所有状态为挂起且早于“超时”的操作
对于每个挂起的操作,通过具有分配 ID 的队列发送数据。
接收方应该知道 ID,如果 ID 已被处理,则不会发生任何事情。
6A。如果您需要 100% 确认操作已完成,您需要第二个队列,接收方将在其中发布消息 ID - DONE。如果不需要这种一致性,请跳过此步骤。或者,它可以发布 ID -Failed 失败原因。
6B。提交方要么等待来自 6A 的消息,要么通过将状态 DONE 写入数据库来完成操作。
一旦 sertine 超时已过或某个重试限制已过。您将状态写入操作 FAIL。
您可以通过 ID 回滚将消息发送到接收方操作。
请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库执行此操作。
我写的是尝试取消确认模式的变体,其中每个消息接收者都应该知道如何管理自己的数据。

TA贡献1886条经验 获得超2个赞
如果有足够的时间来修改设计,建议使用类似 JTA 的 API 来管理 2phase 提交。甚至 weblogic 和 WebSphere 也支持用于两阶段提交的 XA 资源。
如果时间线较少,建议执行以下操作以减少失败间隔。
发送数据主题(不提交)(incase topic down, retry to be perform with a interval)
将数据写入数据库
提交数据库
提交主题
只有当第 4 步失败时才会发生这里失败。这将导致再次发送相同的消息。所以接收系统会收到重复的消息。在JMS2.0 结构中,每条消息都有唯一的messageID 和CorrelationID。所以找到重复项有点直截了当(但这将在接收系统中处理)
这两种情况也适用于集群环境。
严格针对您的情况,认为以下步骤可能有助于解决您的问题
为您的主题订阅一个侦听器 listener-1。
过程-1
为消息 msg-1 添加状态为“待发送”的数据库条目
向主题发送消息 msg-1。在任何主题失败的情况下重试发送如果在某些重试后步骤 2 失败,process-1 必须在发送任何新消息之前重新发送 msg-1 或回滚步骤 1
听众-1
使用订阅的侦听器,从主题读取参考(meesageID/correlationID),并将数据库状态更新为已发送,并从主题读取/删除消息。如果参考读取成功并且数据库更新失败,主题仍然有消息。所以下一次读取将更新数据库。Incase 数据库更新成功但消息删除失败。听众将再次阅读并尝试更新已经完成的消息。所以验证后可以忽略。
Incase listener 本身宕机,topic 将有消息,直到 listener 阅读消息。在此之前,SENT 消息将处于“待发送”状态。

TA贡献1719条经验 获得超6个赞
在侦听器中保存数据库行,其中包含字段 staus='pending'
另一个作业(独立线程)将从数据库中获取所有待处理的行,并对每一行进行以下操作:
2.1 将数据发送到主题
2.2 保存到数据库中
如果我们在第 1 步失败- 一切正常 - 数据处于一致状态,因为作业不会知道该数据的任何信息
如果我们在步骤 2.1 上失败了——没问题,下一个作业调用将尝试处理它
如果我们在步骤 2.2 上失败了——如果我们在这里失败了——这意味着下一个作业调用将再次处理相同的数据。乍一看你可以认为这是一个问题。但是您的消费者必须是幂等的——这意味着它必须了解消息已经被处理并跳过处理。此要求是所有消息代理都保证消息将至少传递一次的结果。因此,无论如何,我们的消费者都必须为重复的消息做好准备。没问题了。
添加回答
举报