为了账号安全,请及时绑定邮箱和手机立即绑定

Reactive Streams:如何按键等待所有发布者?

Reactive Streams:如何按键等待所有发布者?

慕雪6442864 2023-02-16 15:45:35
假设我有 3 个发布者和 1 个处理者。发布者以 形式发布项目{key: <integer>, value: <object>, publisher_id: <string>}。发布者进行 IO 操作,因此:一方面,我希望出版商N在特定时刻处理(大致)项目。另一方面,我希望消费者将项目合并{key: <integer>, values: <list>}到一个记录中(即)我实际上已经实现了一个FluxProcessor具有内部存储空间 ( ConcurrentHashMap) 来保存所有项目的。request()只要未达到 CAPACITY,它就会手动添加新项目。我想知道是否有内置功能可以使用 RxJava(2)/ Spring Reactor API 来做到这一点?
查看完整描述

1 回答

?
慕姐8265434

TA贡献1813条经验 获得超2个赞

在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:


Flowable<KeyValuePublisher> source1 = ...

Flowable<KeyValuePublisher> source2 = ...

Flowable<KeyValuePublisher> source3 = ...


Flowable.merge(

    source1.rebatchRequests(N),

    source2.rebatchRequests(N),

    source3.rebatchRequests(N)

)

.toMultimap(kvp -> kvp.key, kvp -> kvp.value)

subscribe(map -> System.out.println(map));


查看完整回答
反对 回复 2023-02-16
  • 1 回答
  • 0 关注
  • 100 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号