为了账号安全,请及时绑定邮箱和手机立即绑定

Project Reactor:处理快速和缓慢的发布者

Project Reactor:处理快速和缓慢的发布者

米琪卡哇伊 2023-04-26 16:44:10
考虑以下代码:AtomicInteger counter1 = new AtomicInteger();AtomicInteger counter2 = new AtomicInteger();Flux<Object> source = Flux.generate(emitter -> {    emitter.next("item");});Executor executor1 = Executors.newFixedThreadPool(32);Executor executor2 = Executors.newFixedThreadPool(32);Flux<String> flux1 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {        Thread.sleep(1);        return "1_" + counter1.incrementAndGet();}).subscribeOn(Schedulers.fromExecutor(executor1)));Flux<String> flux2 = Flux.merge(source).concatMap(item -> Mono.fromCallable(() -> {    Thread.sleep(100);    return "2_" + counter2.incrementAndGet();}).subscribeOn(Schedulers.fromExecutor(executor2)));Flux.merge(flux1, flux2).subscribe(System.out::println);您可以看到一个发布者比另一个发布者快 100 倍。不过,在运行代码时,似乎所有数据都已打印,但两个发布者之间存在巨大差距,这会增加加班时间。有趣的是,当更改数字时executer2会有1024线程,并且executer1只有1线程,然后我们仍然会看到随着时间的推移越来越大的差距。我的期望是,在相应地调整线程池之后,发布者将得到平衡。我想在发布者之间取得平衡(相对于线程池大小和处理时间)如果我等的时间足够长会发生什么?换句话说,是否会发生背压?(默认情况下,我猜这是一个运行时异常,对吧?)我不想丢弃物品,也不想出现运行时异常。相反,正如我提到的,我希望系统在其拥有的资源和处理时间方面取得平衡——上面的代码是否承诺了这一点?
查看完整描述

1 回答

?
暮色呼如

TA贡献1853条经验 获得超9个赞

本例中的对象Flux不是ParallelFlux对象,因此它们只会使用一个线程。


如果您创建一个能够处理数千个线程的调度程序,并将其传递给其中一个对象,这并不重要Flux——它们只会坐在那里不被使用,这正是本示例中发生的情况。没有背压,也不会导致异常——它会尽可能快地使用一个线程。


如果要确保Flux充分利用可用的 1024 个线程,则需要调用.parallel(1024):


ParallelFlux<String> flux1 = Flux.merge(source).parallel(1).concatMap(item -> Mono.fromCallable(() -> {

    Thread.sleep(1);

    return "1_" + counter1.incrementAndGet();

}).subscribeOn(Schedulers.fromExecutor(executor1)));


ParallelFlux<String> flux2 = Flux.merge(source).parallel(1024).concatMap(item -> Mono.fromCallable(() -> {

    Thread.sleep(100);

    return "2_" + counter2.incrementAndGet();

}).subscribeOn(Schedulers.fromExecutor(executor1)));

如果你对你的代码这样做,那么你会开始看到更接近你似乎期望的结果,尽管它的休眠时间是原来的 100 倍,但还是会过去2_:1_


...

2_17075

2_17076

1_863

1_864

2_17077

1_865

2_17078

2_17079

...

但是,请注意:


我想在发布者之间取得平衡(相对于线程池大小和处理时间)


你不能在这里选择数字来平衡输出,至少不能可靠地或以任何有意义的方式——线程调度将是完全任意的。如果你想这样做,那么你可以使用subscribe 方法的这个变体,允许你显式地调用request()订阅消费者。这样一来,您就可以通过仅请求您准备处理的尽可能多的元素来提供背压。


查看完整回答
反对 回复 2023-04-26
  • 1 回答
  • 0 关注
  • 76 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信