LEAK: ByteBuf.release() was not called in before i

2019-07-20 12:24发布

问题:

I am using reactor-core [1.1.0.RELEASE] , reactor-net [1.1.0.RELEASE] is using netty-all [4.0.18.Final], reactor-spring-context [1.1.0.RELEASE] & Spring Reactor TcpServer [Spring 4.0.3.RELEASE].

I have created simple REST API in netty for health check: /health. I have followed gs-reactor-thumbnailer code

Please see the code as follows:

import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

import org.springframework.stereotype.Service;

import reactor.function.Consumer;
import reactor.net.NetChannel;

@Service
public class HealthCheckNettyRestApi{

    public Consumer<FullHttpRequest> getResponse(NetChannel<FullHttpRequest, FullHttpResponse> channel,int portNumber){
        return req -> {
            if (req.getMethod() != HttpMethod.GET) {
                channel.send(badRequest(req.getMethod()
                        + " not supported for this URI"));
            } else {
                DefaultFullHttpResponse resp = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                resp.content().writeBytes("Hello World".getBytes());
                resp.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
                resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH,resp.content().readableBytes());
                //resp.release();
                channel.send((resp));
            }
        };
    }
}

In Spring Boot Application I am wiring it as:

 @Bean
    public ServerSocketOptions serverSocketOptions(){
        return new NettyServerSocketOptions().
                pipelineConfigurer(pipeline -> pipeline.addLast(new HttpServerCodec()).
                        addLast(new HttpObjectAggregator(16*1024*1024)));
    }

 @Autowired
    private HealthCheckNettyRestApi healthCheck;

    @Value("${netty.port:5555}")
    private Integer nettyPort;

    @Bean
    public NetServer<FullHttpRequest, FullHttpResponse> restApi(Environment env,ServerSocketOptions opts) throws InterruptedException{
        NetServer<FullHttpRequest, FullHttpResponse> server = 
                new TcpServerSpec<FullHttpRequest, FullHttpResponse>(NettyTcpServer.class)
                .env(env)
                .dispatcher("sync")
                .options(opts)
                .listen(nettyPort)
                .consume(ch -> { 

                    Stream<FullHttpRequest> in = ch.in();
                    log.info("netty server is humming.....");

                    in.filter( (FullHttpRequest req) -> (req.getUri().matches(NettyRestConstants.HEALTH_CHECK)))
                    .when(Throwable.class, NettyHttpSupport.errorHandler(ch))
                    .consume(healthCheck.getResponse(ch, nettyPort));

                }).get();

        server.start().await(); //this is working as Tomcat is also deployed due to Spring JPA & web dependencies

        return server;

    }

When I am running benchmark using wrk:

wrk -t6 -c100 -d30s --latency 'http://localhost:8087/health'

Then I am getting following stack trace:

2015-04-22 17:23:21.072] - 16497 ERROR [reactor-tcp-io-22] --- i.n.u.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel()


2015-04-22 23:09:26.354 ERROR 4308 --- [actor-tcp-io-13] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 0
Created at:
    io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:59)
    io.netty.buffer.Unpooled.compositeBuffer(Unpooled.java:355)
    io.netty.handler.codec.http.HttpObjectAggregator.decode(HttpObjectAggregator.java:144)
    io.netty.handler.codec.http.HttpObjectAggregator.decode(HttpObjectAggregator.java:49)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
    io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:341)
    io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:327)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:155)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:148)
    io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:341)
    io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:327)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:116)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:494)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:461)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350)
    io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    java.lang.Thread.run(Thread.java:745)

2015-04-22 23:09:55.217  INFO 4308 --- [actor-tcp-io-13] r.n.netty.NettyNetChannelInboundHandler  : [id: 0x260faf6d, /127.0.0.1:50275 => /127.0.0.1:8087] Connection reset by peer
2015-04-22 23:09:55.219 ERROR 4308 --- [actor-tcp-io-13] reactor.core.Reactor                     : Connection reset by peer

java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:871)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:224)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:108)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:494)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:461)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:745)

My Analyis: I think that since I am forwarding DefaultFullHttpResponse to Spring implementation, Spring APIs should take care about calling release() method. BTW I also tried calling release() method from my implementation but I am still getting the same error.

Could some one tell me what is wrong with the implementation?