为了账号安全,请及时绑定邮箱和手机立即绑定

如何将消息保存到数据库并将响应发送到主题最终一致?

如何将消息保存到数据库并将响应发送到主题最终一致?

侃侃无极 2023-03-02 16:17:21
我有以下 rabbitMq 消费者:Consumer consumer = new DefaultConsumer(channel) {    @Override      public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {            String message = new String(body, "UTF-8");             sendNotificationIntoTopic(message);             saveIntoDatabase(message);      } };可能会出现以下情况:消息已成功发送到主题与数据库的连接丢失,因此数据库插入失败。结果我们有数据不一致。预期结果要么两个操作都成功执行,要么根本没有执行。任何解决方案我怎样才能实现它?聚苯乙烯目前我有以下想法(请评论)我们可以假设代理不会丢失任何消息。我们必须订阅要发送的主题。将条目保存到数据库中并设置status值为“pending”的 字段尝试向主题发送数据。如果发送成功 - 更新status值为“成功”的字段我们必须有一个计划作业,它必须检查具有挂起状态的行。目前可能有两种情况:3.1 根本没有发送通知3.2 发送了通知但存入数据库失败(概率很低但有可能)所以我们必须以某种方式区分这两种情况:我们可以将来自主题的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到与数据库行对应的消息,我们必须将状态更新为“成功”。否则我们必须从数据库中删除条目。我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,我们必须将消息存储在 hazelcast(或类似物)中,但这是假设失败的额外点)
查看完整描述

3 回答

?
慕哥9229398

TA贡献1877条经验 获得超6个赞

这是尝试取消确认模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它应该能够处理您的问题。您应该容忍通过队列重复提交数据的机会。这是一个例子:

  1. 定义抽象操作并为操作分配 ID 和时间戳。

  2. 将状态Pending写入数据库(可以和1一样的步骤)

  3. 编写一个侦听器,轮询数据库中所有状态为挂起且早于“超时”的操作

  4. 对于每个挂起的操作,通过具有分配 ID 的队列发送数据。

  5. 接收方应该知道 ID,如果 ID 已被处理,则不会发生任何事情。

6A。如果您需要 100% 确认操作已完成,您需要第二个队列,接收方将在其中发布消息 ID - DONE。如果不需要这种一致性,请跳过此步骤。或者,它可以发布 ID -Failed 失败原因。

6B。提交方要么等待来自 6A 的消息,要么通过将状态 DONE 写入数据库来完成操作。

  • 一旦 sertine 超时已过或某个重试限制已过。您将状态写入操作 FAIL。

  • 您可以通过 ID 回滚将消息发送到接收方操作。

请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库执行此操作。

我写的是尝试取消确认模式的变体,其中每个消息接收者都应该知道如何管理自己的数据。


查看完整回答
反对 回复 2023-03-02
?
MM们

TA贡献1886条经验 获得超2个赞

如果有足够的时间来修改设计,建议使用类似 JTA 的 API 来管理 2phase 提交。甚至 weblogic 和 WebSphere 也支持用于两阶段提交的 XA 资源。

如果时间线较少,建议执行以下操作以减少失败间隔。

  • 发送数据主题(不提交)(incase topic down, retry to be perform with a interval)

  • 将数据写入数据库

  • 提交数据库

  • 提交主题

只有当第 4 步失败时才会发生这里失败。这将导致再次发送相同的消息。所以接收系统会收到重复的消息。在JMS2.0 结构中,每条消息都有唯一的messageID 和CorrelationID。所以找到重复项有点直截了当(但这将在接收系统中处理)

这两种情况也适用于集群环境。


严格针对您的情况,认为以下步骤可能有助于解决您的问题

为您的主题订阅一个侦听器 listener-1。

过程-1

  • 为消息 msg-1 添加状态为“待发送”的数据库条目

  • 向主题发送消息 msg-1。在任何主题失败的情况下重试发送如果在某些重试后步骤 2 失败,process-1 必须在发送任何新消息之前重新发送 msg-1 或回滚步骤 1

听众-1

  • 使用订阅的侦听器,从主题读取参考(meesageID/correlationID),并将数据库状态更新为已发送,并从主题读取/删除消息。如果参考读取成功并且数据库更新失败,主题仍然有消息。所以下一次读取将更新数据库。Incase 数据库更新成功但消息删除失败。听众将再次阅读并尝试更新已经完成的消息。所以验证后可以忽略。

Incase listener 本身宕机,topic 将有消息,直到 listener 阅读消息。在此之前,SENT 消息将处于“待发送”状态。


查看完整回答
反对 回复 2023-03-02
?
慕侠2389804

TA贡献1719条经验 获得超6个赞

  1. 在侦听器中保存数据库行,其中包含字段 staus='pending'

  2. 另一个作业(独立线程)将从数据库中获取所有待处理的行,并对每一行进行以下操作:
    2.1 将数据发送到主题
    2.2 保存到数据库中

如果我们在第 1 步失败- 一切正常 - 数据处于一致状态,因为作业不会知道该数据的任何信息

如果我们在步骤 2.1 上失败了——没问题,下一个作业调用将尝试处理它

如果我们在步骤 2.2 上失败了——如果我们在这里失败了——这意味着下一个作业调用将再次处理相同的数据。乍一看你可以认为这是一个问题。但是您的消费者必须是幂等的——这意味着它必须了解消息已经被处理并跳过处理。此要求是所有消息代理都保证消息将至少传递一次的结果。因此,无论如何,我们的消费者都必须为重复的消息做好准备。没问题了。


查看完整回答
反对 回复 2023-03-02
  • 3 回答
  • 0 关注
  • 178 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号