我正在尝试配置以下流程:当消息到达 Rabbit 队列时尝试获取锁,查询远程文件服务器以获取某些文件,并为找到的每个文件向另一个队列发送新消息,并在发送所有文件后释放锁文件。IntegrationFlows.from(Amqp.inboundGateway(container) .messageConverter(messageConverter)) .filter(m -> lockService.acquire()) .transform(m -> remoteFileTemplate.list(inputDirectory)) .split() .handle(Amqp.outboundAdapter(amqpTemplate) .exchangeName("") .routingKey(routingKey) .aggregate() .handle(m -> { log.info("Releasing lock"); lock.release(); }) .get();问题是流在第.handle一种方法之后停止(老实说,正如预期的那样),我无法弄清楚如何配置它来做我想做的事?我尝试使用.wireTapand.publishSubscribeChannel但这使得 2 个流相互不依赖,并且我的锁在文件实际发送之前被释放。如果有人可以帮助我解释如何使用 DSL 修复它会很棒,因为我正在动态创建这些流......
1 回答
米脂
TA贡献1836条经验 获得超3个赞
拆分后的 pub/sub,在一个子流上使用 AMQP 处理程序,而在另一个子流上使用聚合器应该可以正常工作。
每个都将在同一个线程上连续调用,最后一条消息导致从聚合器释放,再次在同一个线程上。
话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以在发生错误时释放锁。
编辑
一个不太复杂的解决方案是ChannelInterceptor在转换之前而不是过滤器之前在通道上进行自定义,以锁定preSend()并释放它afterSendCompleted()(成功和失败都调用它)。
添加回答
举报
0/150
提交
取消
