为了账号安全,请及时绑定邮箱和手机立即绑定

流后的Spring DSL句柄?

流后的Spring DSL句柄?

翻过高山走不出你 2022-05-21 16:45:23
我正在尝试配置以下流程:当消息到达 Rabbit 队列时尝试获取锁,查询远程文件服务器以获取某些文件,并为找到的每个文件向另一个队列发送新消息,并在发送所有文件后释放锁文件。IntegrationFlows.from(Amqp.inboundGateway(container)    .messageConverter(messageConverter))    .filter(m -> lockService.acquire())    .transform(m -> remoteFileTemplate.list(inputDirectory))    .split()    .handle(Amqp.outboundAdapter(amqpTemplate)        .exchangeName("")        .routingKey(routingKey)    .aggregate()    .handle(m -> {      log.info("Releasing lock");      lock.release();    })    .get();问题是流在第.handle一种方法之后停止(老实说,正如预期的那样),我无法弄清楚如何配置它来做我想做的事?我尝试使用.wireTapand.publishSubscribeChannel但这使得 2 个流相互不依赖,并且我的锁在文件实际发送之前被释放。如果有人可以帮助我解释如何使用 DSL 修复它会很棒,因为我正在动态创建这些流......
查看完整描述

1 回答

?
米脂

TA贡献1836条经验 获得超3个赞

拆分后的 pub/sub,在一个子流上使用 AMQP 处理程序,而在另一个子流上使用聚合器应该可以正常工作。

每个都将在同一个线程上连续调用,最后一条消息导致从聚合器释放,再次在同一个线程上。

话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以在发生错误时释放锁。

编辑

一个不太复杂的解决方案是ChannelInterceptor在转换之前而不是过滤器之前在通道上进行自定义,以锁定preSend()并释放它afterSendCompleted()(成功和失败都调用它)。


查看完整回答
反对 回复 2022-05-21
  • 1 回答
  • 0 关注
  • 120 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号