为了账号安全,请及时绑定邮箱和手机立即绑定

Reactor:扩展一个 ParallelFlux

Reactor:扩展一个 ParallelFlux

摇曳的蔷薇 2022-06-04 10:58:34
我有一组需要扩展的项目,所以我选择 reactor 是因为它的反应能力,因为扩展需要 IO 操作。这是一段工作代码:public Flux<Item> expand(List<Item> unprocessedItems) {  return Flux.fromIterable(unprocessedItems)    .expandDeep(this::expandItem);}请注意,这this::expandItem是一个阻塞操作(多个数据库查询,一些计算,...)。现在我希望这个扩展是平行的,但据我所知.expand(),.expandDeep()并且只是班级的成员,Flux而不是ParallelFlux班级的成员。我尝试在通话之前添加.publishOn()and ,但没有运气。.subscribeOn().expand()这是我第一次使用反应器,但我没有看到任何阻止并行扩展的技术问题,有什么办法吗?API是否丢失或我错过了什么?
查看完整描述

2 回答

?
狐的传说

TA贡献1804条经验 获得超3个赞

是的,你是对的ParallelFluxhas not .expand()and .expandDeep()methods,但我可以使用其他方式,创建具有扩展方法的附加 Publisher 并将其传递给你的ParallelFlux,如下所示:


public static void main(String[] args) {      


    Function<Node, Flux<Node>> expander =

        node -> Flux.fromIterable(node.children);


    List<Node> roots = createTestNodes();


    Flux.fromIterable(roots)

        .parallel(4)

        .runOn(Schedulers.parallel())

        .flatMap(node -> Flux.just(node).expandDeep(expander))

        .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))

        .sequential()

        .subscribe();


    try {

        Thread.sleep(500);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

    System.out.println("finished");


}

我的测试数据:


static final class Node {

    final String name;

    final List<Node> children;


    Node(String name, Node... nodes) {

        this.name = name;

        this.children = new ArrayList<>();

        children.addAll(Arrays.asList(nodes));

    }


    @Override

    public String toString() {

        return name;

    }

}


static List<Node> createTestNodes() {

    return new Node("root",

        new Node("1",

            new Node("11")

        ),

        new Node("2",

            new Node("21"),

            new Node("22",

                new Node("221")

            )

        ),

        new Node("3",

            new Node("31"),

            new Node("32",

                new Node("321")

            ),

            new Node("33",

                new Node("331"),

                new Node("332",

                    new Node("3321")

                )

            )

        ),

        new Node("4",

            new Node("41"),

            new Node("42",

                new Node("421")

            ),

            new Node("43",

                new Node("431"),

                new Node("432",

                    new Node("4321")

                )

            ),

            new Node("44",

                new Node("441"),

                new Node("442",

                    new Node("4421")

                ),

                new Node("443",

                    new Node("4431"),

                    new Node("4432")

                )

            )

        )

    ).children;

}

结果:


Time: 1549296674522 thread: parallel-4 value: 4

Time: 1549296674523 thread: parallel-4 value: 41

Time: 1549296674523 thread: parallel-2 value: 2

Time: 1549296674523 thread: parallel-2 value: 21

Time: 1549296674523 thread: parallel-3 value: 3

Time: 1549296674523 thread: parallel-3 value: 31

Time: 1549296674523 thread: parallel-1 value: 1

Time: 1549296674523 thread: parallel-1 value: 11

Time: 1549296674525 thread: parallel-2 value: 22

Time: 1549296674525 thread: parallel-2 value: 221

Time: 1549296674526 thread: parallel-3 value: 32

Time: 1549296674526 thread: parallel-3 value: 321

Time: 1549296674526 thread: parallel-3 value: 33

Time: 1549296674526 thread: parallel-3 value: 331

Time: 1549296674526 thread: parallel-3 value: 332

Time: 1549296674526 thread: parallel-3 value: 3321

Time: 1549296674526 thread: parallel-4 value: 42

Time: 1549296674526 thread: parallel-4 value: 421

Time: 1549296674526 thread: parallel-4 value: 43

Time: 1549296674526 thread: parallel-4 value: 431

Time: 1549296674526 thread: parallel-4 value: 432

Time: 1549296674526 thread: parallel-4 value: 4321

Time: 1549296674527 thread: parallel-4 value: 44

Time: 1549296674527 thread: parallel-4 value: 441

Time: 1549296674527 thread: parallel-4 value: 442

Time: 1549296674527 thread: parallel-4 value: 4421

Time: 1549296674528 thread: parallel-4 value: 443

Time: 1549296674528 thread: parallel-4 value: 4431

Time: 1549296674528 thread: parallel-4 value: 4432

如您所见expander,在并行线程中工作。


查看完整回答
反对 回复 2022-06-04
?
ibeautiful

TA贡献1993条经验 获得超6个赞

这是一个示例,基于YauhenBalykin给出的示例:


public static void main(String[] args) {


    Function<Node, Flux<Node>> expander =

            node -> Flux.fromIterable(node.children)

            .subscribeOn(Schedulers.parallel());


    List<Node> roots = createTestNodes();


    Flux.fromIterable(roots)

            .expand(expander)

            .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))

            .subscribe();


    try {

        Thread.sleep(500);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

    System.out.println("finished");


}

测试数据:


static final class Node {

    final String name;

    final List<Node> children;


    Node(String name, Node... nodes) {

        this.name = name;

        this.children = new ArrayList<>();

        children.addAll(Arrays.asList(nodes));

    }


    @Override

    public String toString() {

        return name;

    }

}


static List<Node> createTestNodes() {

    return new Node("root",

            new Node("1",

                    new Node("11")

            ),

            new Node("2",

                    new Node("21"),

                    new Node("22",

                            new Node("221")

                    )

            ),

            new Node("3",

                    new Node("31"),

                    new Node("32",

                            new Node("321")

                    ),

                    new Node("33",

                            new Node("331"),

                            new Node("332",

                                    new Node("3321")

                            )

                    )

            ),

            new Node("4",

                    new Node("41"),

                    new Node("42",

                            new Node("421")

                    ),

                    new Node("43",

                            new Node("431"),

                            new Node("432",

                                    new Node("4321")

                            )

                    ),

                    new Node("44",

                            new Node("441"),

                            new Node("442",

                                    new Node("4421")

                            ),

                            new Node("443",

                                    new Node("4431"),

                                    new Node("4432")

                            )

                    )

            )

    ).children;

}

结果:


Time: 1636182895717 thread: main value: 1

Time: 1636182895754 thread: main value: 2

Time: 1636182895754 thread: main value: 3

Time: 1636182895754 thread: main value: 4

Time: 1636182895761 thread: parallel-1 value: 11

Time: 1636182895761 thread: parallel-2 value: 21

Time: 1636182895761 thread: parallel-2 value: 22

Time: 1636182895762 thread: parallel-3 value: 31

Time: 1636182895762 thread: parallel-3 value: 32

Time: 1636182895762 thread: parallel-3 value: 33

Time: 1636182895762 thread: parallel-4 value: 41

Time: 1636182895762 thread: parallel-4 value: 42

Time: 1636182895762 thread: parallel-4 value: 43

Time: 1636182895762 thread: parallel-4 value: 44

Time: 1636182895764 thread: parallel-7 value: 221

Time: 1636182895764 thread: parallel-9 value: 321

Time: 1636182895764 thread: parallel-10 value: 331

Time: 1636182895765 thread: parallel-10 value: 332

Time: 1636182895765 thread: parallel-12 value: 421

Time: 1636182895765 thread: parallel-1 value: 431

Time: 1636182895765 thread: parallel-1 value: 432

Time: 1636182895766 thread: parallel-2 value: 441

Time: 1636182895766 thread: parallel-2 value: 442

Time: 1636182895766 thread: parallel-2 value: 443

Time: 1636182895766 thread: parallel-6 value: 3321

Time: 1636182895767 thread: parallel-9 value: 4321

Time: 1636182895767 thread: parallel-11 value: 4421

Time: 1636182895767 thread: parallel-12 value: 4431

Time: 1636182895767 thread: parallel-12 value: 4432

finished


查看完整回答
反对 回复 2022-06-04
  • 2 回答
  • 0 关注
  • 205 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号