I have problem with correctly saving a file after its download is complete in Project Reactor.
class HttpImageClientDownloader implements ImageClientDownloader {
private final ExchangeFunction exchangeFunction;
HttpImageClientDownloader() {
this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
}
@Override
public Mono<File> downloadImage(String url, Path destination) {
ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
return exchangeFunction.exchange(clientRequest)
.map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
//.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
.flatMap(dataBuffer -> {
AsynchronousFileChannel fileChannel = createFile(destination);
return DataBufferUtils
.write(dataBuffer, fileChannel, 0)
.publishOn(Schedulers.elastic())
.doOnNext(DataBufferUtils::release)
.then(Mono.just(destination.toFile()));
});
}
private AsynchronousFileChannel createFile(Path path) {
try {
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
} catch (Exception e) {
throw new ImageDownloadException("Error while creating file: " + path, e);
}
}
}
So my question is: Is DataBufferUtils.write(dataBuffer, fileChannel, 0) blocking?
What about when the disk is slow?
And second question about what happens when ImageDownloadException occurs , In doOnNext I want to release the given data buffer, is that a good place for this kind operation?
I think also this line:
.map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
could be blocking...
Here's another (shorter) way to achieve that:
Now
DataBufferUtils::write
operations are not blocking because they use non-blocking IO with channels. Writing to such channels means it'll write whatever it can to the output buffer (i.e. may write all theDataBuffer
or just part of it).Using
Flux::map
orFlux::doOnNext
is the right place to do that. But you're right, if an error occurs, you're still responsible for releasing the current buffer (and all the remaining ones). There might be something we can improve here in Spring Framework, please keep an eye on SPR-16782.I don't see how your last sample shows anything blocking: all methods return reactive types and none are doing blocking I/O.