I am new to Reactor framework and trying to utilize it in one of our existing implementations. LocationProfileService and InventoryService both return a Mono and are to executed in parallel and have no dependency on each other (from the MainService). Within LocationProfileService - there are 4 queries issued and the last 2 queries have a dependency on the first query.
What is a better way to write this? I see the calls getting executed sequentially, while some of them should be executed in parallel. What is the right way to do it?
public class LocationProfileService {
static final Cache<String, String> customerIdCache //define Cache
@Override
public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
//These 2 are not interdependent and can be executed immediately
Mono<String> customerAccountMono = getCustomerArNumber(customerId,location) LocationNumber).subscribeOn(Schedulers.parallel()).switchIfEmpty(Mono.error(new CustomerNotFoundException(location, customerId))).log();
Mono<LocationProfile> locationProfileMono = Mono.fromFuture(//location query).subscribeOn(Schedulers.parallel()).log();
//Should block be called, or is there a better way to do ?
String custAccount = customerAccountMono.block(); // This is needed to execute and the value from this is needed for the next 2 calls
Mono<Customer> customerMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
Mono<Result<LocationPricing>> locationPricingMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
return Mono.zip(locationProfileMono,customerMono,locationPricingMono).flatMap(tuple -> {
LocationProfileInfo locationProfileInfo = new LocationProfileInfo();
//populate values from tuple
return Mono.just(locationProfileInfo);
});
}
private Mono<String> getCustomerAccount(String conversationId, String customerId, String location) {
return CacheMono.lookup((Map)customerIdCache.asMap(),customerId).onCacheMissResume(Mono.fromFuture(//query).subscribeOn(Schedulers.parallel()).map(x -> x.getAccountNumber()));
}
}
public class InventoryService {
@Override
public Mono<InventoryInfo> getInventoryInfo(String inventoryId) {
Mono<Inventory> inventoryMono = Mono.fromFuture(//inventory query).subscribeOn(Schedulers.parallel()).log();
Mono<List<InventorySale>> isMono = Mono.fromFuture(//inventory sale query).subscribeOn(Schedulers.parallel()).log();
return Mono.zip(inventoryMono,isMono).flatMap(tuple -> {
InventoryInfo inventoryInfo = new InventoryInfo();
//populate value from tuple
return Mono.just(inventoryInfo);
});
}
}
public class MainService {
@Autowired
LocationProfileService locationProfileService;
@Autowired
InventoryService inventoryService
public void mainService(String customerId, String location, String inventoryId) {
Mono<LocationProfileInfo> locationProfileMono = locationProfileService.getProfileInfoByLocationAndCustomer(....);
Mono<InventoryInfo> inventoryMono = inventoryService.getInventoryInfo(....);
//is using block fine or is there a better way to do?
Mono.zip(locationProfileMono,inventoryMono).subscribeOn(Schedulers.parallel()).block();
}
}