为了账号安全,请及时绑定邮箱和手机立即绑定

在 Spring Webflux 中执行阻塞 JDBC 调用

在 Spring Webflux 中执行阻塞 JDBC 调用

婷婷同学_ 2022-07-06 17:26:00
我正在使用 Spring Webflux 和 Spring data jpa,使用 PostgreSql 作为后端数据库。我不想在进行 db 调用时阻塞主线程findand save。为了达到同样的目的,我在Controller课堂上有一个主调度程序和一个jdbcScheduler服务类。我定义它们的方式是:@Configuration@EnableJpaAuditingpublic class CommonConfig {    @Value("${spring.datasource.hikari.maximum-pool-size}")    int connectionPoolSize;    @Bean    public Scheduler scheduler() {        return Schedulers.parallel();    }    @Bean    public Scheduler jdbcScheduler() {        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));    }    @Bean    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {        return new TransactionTemplate(transactionManager);    }}现在,在我的服务层中进行获取/保存调用时,我会:    @Override    public Mono<Config> getConfigByKey(String key) {        return Mono.defer(            () -> Mono.justOrEmpty(configRepository.findByKey(key)))            .subscribeOn(jdbcScheduler)            .publishOn(scheduler);    }    @Override    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {        return Flux            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))            .subscribeOn(jdbcScheduler)            .publishOn(scheduler);    }    @Override    public Flux<Config> addConfigs(List<Config> configList) {        return Flux.fromIterable(configRepository.saveAll(configList))            .subscribeOn(jdbcScheduler)            .publishOn(scheduler);    }在控制器中,我这样做:    @PostMapping    @ResponseStatus(HttpStatus.CREATED)    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {        return configService.addConfigs(configs).collectList()            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))            .subscribeOn(scheduler);    }这个对吗?和/或有更好的方法吗?我的理解是:.subscribeOn(jdbcScheduler).publishOn(scheduler);是该任务将在jdbcScheduler线程上运行,稍后的结果将在我的主并行上发布scheduler。这种理解正确吗?
查看完整描述

2 回答

?
人到中年有点甜

TA贡献1895条经验 获得超7个赞

您对publishOnand的理解是正确的subscribeOn请参阅 reactor 项目中有关这些操作员的参考文档)。

如果你调用阻塞库而不调度特定调度器的工作,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),你的应用程序将只能同时处理几个请求。

现在我不确定你这样做是为了达到什么目的。

首先,并行调度程序是为 CPU 密集型任务设计的,这意味着您将拥有很少的任务,与 CPU 内核一样多(或更多)。在这种情况下,这就像将您的线程池大小设置为常规 Servlet 容器上的内核数。您的应用将无法处理大量并发请求。

即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是在 Spring WebFlux 中本地调度请求处理的地方。

如果您的最终目标是性能和可扩展性,那么在响应式应用程序中封装阻塞调用可能比您的常规 Servlet 容器执行得更差。

您可以改为使用 Spring MVC 并且:

  • 在处理阻塞库时使用通常的阻塞返回类型,比如 JPA

  • 当您不依赖于此类库时使用Mono和返回类型Flux

这不会是非阻塞的,但这仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。


查看完整回答
反对 回复 2022-07-06
?
繁花如伊

TA贡献2012条经验 获得超12个赞

恕我直言,有一种方法可以执行此操作,从而更好地利用机器资源。按照文档,您可以将调用包装在其他线程中,这样您就可以继续执行。



查看完整回答
反对 回复 2022-07-06
  • 2 回答
  • 0 关注
  • 500 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号