How to decompress a Flux (and how to w

2020-02-28 19:45发布

I have a requirement to read and write compressed (GZIP) streams without intermediate storage. Currently, I'm using Spring RestTemplate to do the writing, and Apache HTTP client to do the reading (see my answer here for an explanation of why RestTemplate can't be used for reading large streams). The implementation is fairly straightforward, where I slap a GZIPInputStream on the response InputStream and move on.

Now, I'd like to switch to using Spring 5 WebClient (just because I'm not a fan of status quo). However, WebClient is reactive in nature and deals with Flux<Stuff>; I believe it's possible to get a Flux<DataBuffer>, where DataBuffer is an abstraction over ByteBuffer. Question is, how do I decompress it on the fly without having to store the full stream in memory (OutOfMemoryError, I'm looking at you), or writing to local disk? It's worth mentioning that WebClient uses Netty under the hood.

I'll admit to not knowing much about (de)compression, however, I did my research, but none of the material available online seemed particularly helpful.

compression on java nio direct buffers

Writing GZIP file with nio

Reading a GZIP file from a FileChannel (Java NIO)

(de)compressing files using NIO

Iterable gzip deflate/inflate in Java

2条回答
不美不萌又怎样
2楼-- · 2020-02-28 19:57

Noting this here as it confused me a bit - the API has changed a bit as of 5.1.

I have a similar setup to the accepted answer for the ChannelInboundHandler:

public class GzipJsonHeadersHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse
                && !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();
            headers.clear();
            headers.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
            headers.set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
        }
        ctx.fireChannelRead(msg);
    }
}

(The header values I needed are just hard-coded there for simplicity, otherwise it's identical.)

To register it however is different:

WebClient.builder()
    .clientConnector(
            new ReactorClientHttpConnector(
                    HttpClient.from(
                            TcpClient.create()
                                    .doOnConnected(c -> {
                                        c.addHandlerFirst(new HttpContentDecompressor());
                                        c.addHandlerFirst(new HttpResponseHeadersHandler());
                                    })
                    ).compress(true)
            )
    )
    .build();

It seems Netty now maintains a user list of handlers separate from (and after) the system list, and addHandlerFirst() only puts your handler at the front of the user list. It therefore requires an explicit call to HttpContentDecompressor to ensure it's definitely executed after your handler that inserts the correct headers.

查看更多
劫难
3楼-- · 2020-02-28 20:16
public class HttpResponseHeadersHandler extends ChannelInboundHandlerAdapter {
    private final HttpHeaders httpHeaders;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpResponse &&
                !HttpStatus.resolve(((HttpResponse) msg).status().code()).is1xxInformational()) {
            HttpHeaders headers = ((HttpResponse) msg).headers();

            httpHeaders.forEach(e -> {
                log.warn("Modifying {} from: {} to: {}.", e.getKey(), headers.get(e.getKey()), e.getValue());
                headers.set(e.getKey(), e.getValue());
            });
        }
        ctx.fireChannelRead(msg);
    }
}

Then I create a ClientHttpConnector to use with WebClient and in afterNettyContextInit add the handler:

ctx.addHandlerLast(new ReadTimeoutHandler(readTimeoutMillis, TimeUnit.MILLISECONDS));
ctx.addHandlerLast(new Slf4JLoggingHandler());
if (forceDecompression) {
    io.netty.handler.codec.http.HttpHeaders httpHeaders = new ReadOnlyHttpHeaders(
            true,
            CONTENT_ENCODING, GZIP,
            CONTENT_TYPE, APPLICATION_JSON
    );
    HttpResponseHeadersHandler headersModifier = new HttpResponseHeadersHandler(httpHeaders);
    ctx.addHandlerFirst(headersModifier);
}
ctx.addHandlerLast(new HttpContentDecompressor());

This, of course, would fail for responses that are not GZIP compressed, so I use this instance of WebClient for a particular use case only, where I know for sure that the response is compressed.

Writing is easy: Spring has a ResourceEncoder, so InputStream can simply be converted to InputStreamResource, and voila!

查看更多
登录 后发表回答