Downlolad and save file from ClientRequest using E

2020-04-10 04:01发布

I have problem with correctly saving a file after its download is complete in Project Reactor.

class HttpImageClientDownloader implements ImageClientDownloader {

    private final ExchangeFunction exchangeFunction;

    HttpImageClientDownloader() {
        this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
    }

    @Override
    public Mono<File> downloadImage(String url, Path destination) {

        ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
        return exchangeFunction.exchange(clientRequest)
                .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                //.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                .flatMap(dataBuffer -> {

                    AsynchronousFileChannel fileChannel = createFile(destination);
                    return DataBufferUtils
                            .write(dataBuffer, fileChannel, 0)
                            .publishOn(Schedulers.elastic())
                            .doOnNext(DataBufferUtils::release)
                            .then(Mono.just(destination.toFile()));


                });

    }

    private AsynchronousFileChannel createFile(Path path) {
        try {
            return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
        } catch (Exception e) {
            throw new ImageDownloadException("Error while creating file: " + path, e);
        }
    }
}

So my question is: Is DataBufferUtils.write(dataBuffer, fileChannel, 0) blocking?

What about when the disk is slow?

And second question about what happens when ImageDownloadException occurs , In doOnNext I want to release the given data buffer, is that a good place for this kind operation?

I think also this line:

            .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))

could be blocking...

1条回答
beautiful°
2楼-- · 2020-04-10 04:21

Here's another (shorter) way to achieve that:

Flux<DataBuffer> data = this.webClient.get()
        .uri("/greeting")
        .retrieve()
        .bodyToFlux(DataBuffer.class);

Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
        .map(DataBufferUtils::release)
        .then(Mono.just(file));

Now DataBufferUtils::write operations are not blocking because they use non-blocking IO with channels. Writing to such channels means it'll write whatever it can to the output buffer (i.e. may write all the DataBuffer or just part of it).

Using Flux::map or Flux::doOnNext is the right place to do that. But you're right, if an error occurs, you're still responsible for releasing the current buffer (and all the remaining ones). There might be something we can improve here in Spring Framework, please keep an eye on SPR-16782.

I don't see how your last sample shows anything blocking: all methods return reactive types and none are doing blocking I/O.

查看更多
登录 后发表回答