I have a spring-webflux API which, at a service layer, needs to read from an existing repository which uses JDBC.
Having done some reading on the subject, I would like to keep the execution of the blocking database call separate from the rest of my non-blocking async code.
I have defined a dedicated jdbcScheduler:
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(maxPoolSize));
}
And an AsyncWrapper utility to use it:
@Component
public class AsyncJdbcWrapper {
private final Scheduler jdbcScheduler;
@Autowired
public AsyncJdbcWrapper(Scheduler jdbcScheduler) {
this.jdbcScheduler = jdbcScheduler;
}
public <T> Mono<T> async(Callable<T> callable) {
return Mono.fromCallable(callable)
.subscribeOn(jdbcScheduler)
.publishOn(Schedulers.parallel());
}
}
Which is then used to wrap jdbc calls like so:
Mono<Integer> userIdMono = asyncWrapper.async(() -> userDao.getUserByUUID(request.getUserId()))
.map(userOption -> userOption.map(u -> u.getId())
.orElseThrow(() -> new IllegalArgumentException("Unable to find user with ID " + request.getUserId())));
I've got two questions:
1) Am I correctly pushing the execution of blocking calls to another set of threads? Being fairly new to this stuff I'm struggling with the intricacies of subscribeOn()/publishOn().
2) Say I want to make use of the resulting mono, e.g call an API with the result of the userIdMono, on which scheduler will that be executed? The one specifically created for the jdbc calls, or the main(?) thread that reactor usually operates within? e.g.
userIdMono.map(id -> someApiClient.call(id));