假设我有 3 个发布者和 1 个处理者。发布者以 形式发布项目{key: <integer>, value: <object>, publisher_id: <string>}。发布者进行 IO 操作,因此:一方面,我希望出版商N在特定时刻处理(大致)项目。另一方面,我希望消费者将项目合并{key: <integer>, values: <list>}到一个记录中(即)我实际上已经实现了一个FluxProcessor具有内部存储空间 ( ConcurrentHashMap) 来保存所有项目的。request()只要未达到 CAPACITY,它就会手动添加新项目。我想知道是否有内置功能可以使用 RxJava(2)/ Spring Reactor API 来做到这一点?
1 回答

慕姐8265434
TA贡献1813条经验 获得超2个赞
在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:
Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...
Flowable.merge(
source1.rebatchRequests(N),
source2.rebatchRequests(N),
source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));
添加回答
举报
0/150
提交
取消