为了账号安全,请及时绑定邮箱和手机立即绑定

为什么使用 Sink.as 创建的发布服务器在被广播中心使用时无法正常工作?

为什么使用 Sink.as 创建的发布服务器在被广播中心使用时无法正常工作?

HUX布斯 2022-09-22 15:51:42

我们有一个多组件应用程序,它在组件之间提供反应式流API。一些组件使用Akka流实现,其他组件使用例如反应器。

在一个组件中,我们注意到 Streams 不处理任何消息,尽管提供的发布者提供记录。我把问题归结为以下情况:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source<String, NotUsed> allMessages = Source
    .fromPublisher(stringPublisher)
    .toMat(BroadcastHub.of(String.class, 256), Keep.right())
    .run(materializer);

allMessages
    .runForeach(System.out::println, materializer)
    .toCompletableFuture()
    .get();

一个组件提供发布服务器(它需要是发布者,因为 API 使用反应式流 API,而不是 Akka 流 API)。此发布服务器是从另一个 Akka 流源创建的,并使用 转换为发布服务器。Sink.asPublisher

现在,当我们使用 BroadcastHub 从发布者开始实现流时,根本不会处理任何记录。

我对反应堆出版商也试过同样的方法:

Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

这按预期工作。不幸的是,我不能排除另一个组件从 Akka 流源创建其发布服务器的情况。

有没有人知道出了什么问题?


查看完整描述

1 回答

?
浮云间

TA贡献1504条经验 获得超3个赞

我现在知道如何解决它,如果我开始在map材料化值中使用广播中心的结果源,它就会起作用:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source
    .fromPublisher(stringPublisher)
    .alsoToMat(BroadcastHub.of(String.class, 256), Keep.right())
    .mapMaterializedValue(source -> source
         .runWith(Sink.foreach(System.out::println, materializer))
    .run(materializer)
    .toCompletableFuture()
    .get();

编辑:TL;DR:在光弯论坛上有这样的解释:

此处发生的情况是,当您附加另一个流时,主流流已经完成。有时,在完成之前查看一些元素可能足够快。

---

因此,看起来广播中心实际上在消费者附加到广播中心创建的源之前删除了元素。

文档说它不会掉落:

如果没有订阅者连接到此集线器,则它不会丢弃任何元素,而是对上游生产者进行背压,直到订阅者到达。

https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html

实际上,在大多数情况下都是如此,但是我发现有些情况下它的行为不正确:

public void testBH3() throws ExecutionException, InterruptedException {

    Publisher<String> stringPublisher = Source

        .from(Lists.newArrayList("Hello", "World", "!"))

        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);


    Source<String, NotUsed> allMessages = Source

        .fromPublisher(stringPublisher)

        .toMat(BroadcastHub.of(String.class, 256), Keep.right())

        .run(materializer);


    allMessages

        .runForeach(System.out::println, materializer)

        .toCompletableFuture()

        .get();

}


public void repeat() throws ExecutionException, InterruptedException {

    for (int i = 0; i < 100; i++) {

        testBH3();

        System.out.println("------");

    }

}

这适用于 100 个案例中的大约 3 个。但以下方法适用于所有情况(我只是添加了一个节流阀来产生较慢的元素):


public void testBH3() throws ExecutionException, InterruptedException {

    Publisher<String> stringPublisher = Source

        .from(Lists.newArrayList("Hello", "World", "!"))

        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);


    Source<String, NotUsed> allMessages = Source

        .fromPublisher(stringPublisher)

        .throttle(1, Duration.ofSeconds(1))

        .toMat(BroadcastHub.of(String.class, 256), Keep.right())

        .run(materializer);


    allMessages

        .runForeach(System.out::println, materializer)

        .toCompletableFuture()

        .get();

}

因此,在我看来,当没有已经连接Sink时,广播中心有时会丢弃元素。


查看完整回答
反对 回复 6天前

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信