在Spring Boot 2 with Reactor中,我试图合并两个热源。但是,似乎唯一一个报告了 中的两个参数中的第一个。我如何识别第二个.FluxmergeFluxmergemergeFlux在下面的示例中,in 甚至不会打印 when 是第一个参数。如果我做第一个,那么不打印。System.errB-2outgoing1aoutgoing2A-2以下是完整的示例;package com.example.demo;import java.time.Duration;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import reactor.core.publisher.Flux;import reactor.core.scheduler.Schedulers;public class Weather {String city;Integer temperature;public Weather(String city, Integer temperature) { this.city = city; this.temperature = temperature;}@Overridepublic String toString() { return "Weather [city=" + city + ", temperature=" + temperature + "]";}public static void main(String[] args) { BlockingQueue<Weather> queue = new LinkedBlockingQueue<>(); BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>(); // Assume Spring @Repository "A-1" new Thread(() -> { for (int d = 1; d < 1000; d += 1) { for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) { queue.add(new Weather(s, d)); try { Thread.sleep(250); } catch (InterruptedException e) {} } } }).start(); // Assume Spring @Repository "B-1" new Thread(() -> { for (int d = 1; d < 1000; d += 1) { for (String s: new String[] {"MOS", "TLV"}) { queue2.add(new Weather(s, d)); try { Thread.sleep(1000); } catch (InterruptedException e) {} } } }).start(); // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR Flux<Weather> outgoing1 = Flux.<Weather>create( sink -> { for (int i = 0; i < 1000; i++) { try { sink.next(queue.take()); System.err.println("1 " + queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } sink.complete(); } )
1 回答
阿晨1998
TA贡献2037条经验 获得超6个赞
这里有一些事情在起作用。
请注意
.merge运算符的以下建议...
请注意,合并是为使用异步源或有限源而定制的。当处理尚未在专用计划程序上发布的无限源时,您必须将该源隔离在其自己的计划程序中,否则合并会尝试在订阅另一个源之前将其排出。
您的出站助焊剂使用 ,但这只影响在运算符之后链接的运算符。即,它不会影响之前的任何内容。具体来说,它不会影响 lambda 中的代码传递到执行的线程。如果您在每个出站通量之前添加,您可以看到这一点。
.publishOn.publishOn.publishOnFlux.create.log().publishOn您的 lambda 已传递给调用阻塞方法 ()。
Flux.createqueue.take
由于您在线程中调用合并的 Flux,因此您的 lambda 将传递给线程中的执行,并阻止它。subscribe(...)mainFlux.createmain
最简单的解决方法是使用而不是使 lambda 中的代码传递到不同的线程(不是 )上运行。这将防止线程阻塞,并允许来自两个出站流的合并输出交错。.subscribeOn.publishOnFlux.createmainmain
添加回答
举报
0/150
提交
取消
