为了账号安全,请及时绑定邮箱和手机立即绑定

RxJava 相当于简单的 ThreadPoolExecutor 示例

RxJava 相当于简单的 ThreadPoolExecutor 示例

千万里不及你 2023-09-27 16:54:17
我已经退出 Java 游戏大约 8 年了,从那时起发生了很多变化。对我来说最大的挑战是 RxJava / 反应式。我正在寻找有关如何以完全反应式方式执行以下等效操作的粗略指导。Stuff下面使用 ThreadPoolExecutor 实现的基本要求是通过调用远程 Web 服务来处理大量数据,该服务的记录速率限制为 100 个请求/分钟。我的目标是尽可能快地处理尽可能多的数据,不丢失任何数据,Stuff但仍遵守下游速率限制。该代码已被简化,以避免错误、隔板、断路器、重试逻辑等。这段代码目前工作正常,但在所有非阻塞反应选项的情况下,它会导致感觉浪费了很多线程。甚至我用来调用服务的 HTTP 客户端也会返回 a Flowable,我只是在执行程序的 20 个线程中的每个线程中阻塞它。我很想了解反应性等价物应该是什么。我一直在努力的地方是我发现的几乎所有文档都展示了使用 Observable 的静态源(例如Observable.fromArray(1,2,3,4,5):)。我知道解决方案可能涉及IoScheduler和groupBy,但我还没有弄清楚如何Flowable将来自我的 HTTP 客户端的 s 合并到某个完整的链中,该链可以进行并行化(最多限制,例如 20)和速率限制。public class Example {    private static final int THREADS = 20;    // using https://docs.micronaut.io/latest/guide/index.html#httpClient    @Client("http://stuff-processor.internal:8080")    @Inject    RxHttpClient httpClient;    private ThreadPoolExecutor executor;    private final RateLimiter rateLimiter;    public Example() {        // up to 20 threads to process the unbounded queue        // incoming Stuff is very bursty...        // ...we could go hours without anything and then hundreds could come in        this.executor = new ThreadPoolExecutor(THREADS, THREADS,                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());        this.executor.allowCoreThreadTimeOut(true);        // using https://resilience4j.readme.io/docs/ratelimiter        RateLimiterConfig config = RateLimiterConfig.custom()                .limitRefreshPeriod(Duration.ofSeconds(60))                .limitForPeriod(100)                .timeoutDuration(Duration.ofSeconds(90))                .build();        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);    }    /**     * Called when the user takes an action that can cause 1 or 1000s of new     * Stuff to be entered into the system. Each instance of Stuff results in     * a separate call to this method. Ex: 100 Stuffs = 100 calls.     */
查看完整描述

1 回答

?
Cats萌萌

TA贡献1805条经验 获得超9个赞

首先,要以完全非阻塞的方式构建它,您需要使用像 Netty 这样的非阻塞、异步 HTTP 客户端库。我不确定如何RxHttpClient运作。

假设你有一个 list stuff。我就是这样做的:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap合并响应。

为了限制速率,您flatMap有第二个参数,它限制它并行订阅的内部流的数量。假设您想同时拨打不超过 10 个电话。做这个:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();



查看完整回答
反对 回复 2023-09-27
  • 1 回答
  • 0 关注
  • 55 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信