我正在使用 Spring Webflux 和 Spring data jpa,使用 PostgreSql 作为后端数据库。我不想在进行 db 调用时阻塞主线程findand save。为了达到同样的目的,我在Controller课堂上有一个主调度程序和一个jdbcScheduler服务类。我定义它们的方式是:@Configuration@EnableJpaAuditingpublic class CommonConfig { @Value("${spring.datasource.hikari.maximum-pool-size}") int connectionPoolSize; @Bean public Scheduler scheduler() { return Schedulers.parallel(); } @Bean public Scheduler jdbcScheduler() { return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); } @Bean public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { return new TransactionTemplate(transactionManager); }}现在,在我的服务层中进行获取/保存调用时,我会: @Override public Mono<Config> getConfigByKey(String key) { return Mono.defer( () -> Mono.justOrEmpty(configRepository.findByKey(key))) .subscribeOn(jdbcScheduler) .publishOn(scheduler); } @Override public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) { return Flux .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion)) .subscribeOn(jdbcScheduler) .publishOn(scheduler); } @Override public Flux<Config> addConfigs(List<Config> configList) { return Flux.fromIterable(configRepository.saveAll(configList)) .subscribeOn(jdbcScheduler) .publishOn(scheduler); }在控制器中,我这样做: @PostMapping @ResponseStatus(HttpStatus.CREATED) Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) { return configService.addConfigs(configs).collectList() .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null)) .subscribeOn(scheduler); }这个对吗?和/或有更好的方法吗?我的理解是:.subscribeOn(jdbcScheduler).publishOn(scheduler);是该任务将在jdbcScheduler线程上运行,稍后的结果将在我的主并行上发布scheduler。这种理解正确吗?
2 回答

人到中年有点甜
TA贡献1895条经验 获得超7个赞
您对publishOn
and的理解是正确的subscribeOn
(请参阅 reactor 项目中有关这些操作员的参考文档)。
如果你调用阻塞库而不调度特定调度器的工作,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),你的应用程序将只能同时处理几个请求。
现在我不确定你这样做是为了达到什么目的。
首先,并行调度程序是为 CPU 密集型任务设计的,这意味着您将拥有很少的任务,与 CPU 内核一样多(或更多)。在这种情况下,这就像将您的线程池大小设置为常规 Servlet 容器上的内核数。您的应用将无法处理大量并发请求。
即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是在 Spring WebFlux 中本地调度请求处理的地方。
如果您的最终目标是性能和可扩展性,那么在响应式应用程序中封装阻塞调用可能比您的常规 Servlet 容器执行得更差。
您可以改为使用 Spring MVC 并且:
在处理阻塞库时使用通常的阻塞返回类型,比如 JPA
当您不依赖于此类库时使用
Mono
和返回类型Flux
这不会是非阻塞的,但这仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。
添加回答
举报
0/150
提交
取消