Netty async write response and large data unknown

2019-07-14 00:53发布

问题:

I develop a netty http server, but when I write the response in the method ChannelInboundHandlerAdapter.channelRead0, my response result comes from another server and the size of the result is unknown, so its http response headers maybe has content-length or chunked. so I use a buffer, if it's enough (read up full data) regardless of content-length or chunked, I use content-length, otherwise I use chunked.

  1. How I hold the write channel of first connection then pass it to the seconde Handler inorder to write the response. ( I just directly pass ctx to write but nothing returns)

  2. How I conditionally decide write chunked data to channel or normal data with content-length (it seems not to work to add ChunkWriteHandler if chunk is needed when channelRead0.

take a simple code for example:

```java

EventLoopGroup bossGroup = new NioEventLoopGroup();
    final EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<Channel>(){

                @Override
                protected void initChannel(Channel ch) throws Exception
                {
                    System.out.println("Start, I accept client");
                    ChannelPipeline pipeline = ch.pipeline();

                    // Uncomment the following line if you want HTTPS
                    // SSLEngine engine =
                    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
                    // engine.setUseClientMode(false);
                    // pipeline.addLast("ssl", new SslHandler(engine));

                    pipeline.addLast("decoder", new HttpRequestDecoder());
                    // Uncomment the following line if you don't want to handle HttpChunks.
                    // pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
                    pipeline.addLast("encoder", new HttpResponseEncoder());
                    // Remove the following line if you don't want automatic content
                    // compression.
                    //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
                    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
                    pipeline.addLast("deflater", new HttpContentCompressor());
                    pipeline.addLast("handler", new SimpleChannelInboundHandler<HttpObject>(){

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
                            {
                                System.out.println("msg=" + msg);

                                final ChannelHandlerContext ctxClient2Me = ctx;

                                // TODO: Implement this method
                                Bootstrap bs = new Bootstrap();
                                try{
                                //bs.resolver(new DnsAddressResolverGroup(NioDatagramChannel.class,  DefaultDnsServerAddressStreamProvider.INSTANCE));
                                //.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
                                bs.resolver(DefaultAddressResolverGroup.INSTANCE);
                                }catch(Exception e){
                                    e.printStackTrace();
                                }

                                bs.channel(NioSocketChannel.class);
                                EventLoopGroup cg = workerGroup;//new NioEventLoopGroup();
                                bs.group(cg).handler(new ChannelInitializer<Channel>(){

                                        @Override
                                        protected void initChannel(Channel ch) throws Exception
                                        {
                                            System.out.println("start, server accept me");
                                            // TODO: Implement this method
                                            ch.pipeline().addLast("http-request-encode", new HttpRequestEncoder());
                                            ch.pipeline().addLast(new HttpResponseDecoder());
                                            ch.pipeline().addLast("http-res", new SimpleChannelInboundHandler<HttpObject>(){

                                                    @Override
                                                    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception
                                                    {
                                                        // TODO: Implement this method
                                                        System.out.println("target = " + msg);
                                                        //
                                                        if(msg instanceof HttpResponse){
                                                            HttpResponse res = (HttpResponse) msg;
                                                            HttpUtil.isTransferEncodingChunked(res);
                                                            DefaultHttpResponse resClient2Me = new DefaultHttpResponse(HttpVersion.HTTP_1_1, res.getStatus());

                                                            //resClient2Me.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                                            //resClient2Me.headers().set(HttpHeaderNames.CONTENT_LENGTH, "");

                                                            ctxClient2Me.write(resClient2Me);
                                                        }
                                                        if(msg instanceof LastHttpContent){
                                                            // now response the request of the client, it wastes x seconds from receiving request to response
                                                            ctxClient2Me.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
                                                            ctx.close();
                                                        }else if( msg instanceof HttpContent){
                                                            //ctxClient2Me.write(new DefaultHttpContent(msg)); write chunk by chunk ?
                                                        }
                                                    }


                                                });

                                            System.out.println("end, server accept me");

                                        }

                                });

                                final URI uri = new URI("http://example.com/");
                                String host = uri.getHost();
                                ChannelFuture connectFuture= bs.connect(host, 80);

                                System.out.println("to connect me to server");

                                connectFuture.addListener(new ChannelFutureListener(){

                                        @Override
                                        public void operationComplete(ChannelFuture cf) throws Exception
                                        {
                                        }

                                });


                                ChannelFuture connetedFuture = connectFuture.sync(); // TODO optimize, wait io 
                                System.out.println("connected me to server");

                                DefaultFullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
                                //req.headers().set(HttpHeaderNames.HOST, "");
                                connetedFuture.channel().writeAndFlush(req);

                                System.out.println("end of Client2Me channelRead0");
                                System.out.println("For the seponse of Me2Server, see SimpleChannelInboundHandler.channelRead0");
                            }

                    });
                    System.out.println("end, I accept client");
                }

            });

            System.out.println("========");

        ChannelFuture channelFuture = serverBootstrap.bind(2080).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

```

回答1:

After a bit of struggle trying to send response from non-Netty eventloop thread, I finally figured out the problem. If your client is closing the outputstream using

socketChannel.shutdownOutput()

then you need to set ALLOW_HALF_CLOSURE property true in Netty so it won't close the channel. Here's a sample server. The client is left as an exercise to the reader :-)

    final ServerBootstrap b = new ServerBootstrap();

    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
                @Override
                protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                            ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // This is important
                        }

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
                            String id = ctx.channel().id().asLongText();

                            // When Done reading all the bytes, send response 1 second later
                            timer.schedule(new TimerTask() {
                                @Override
                                public void run() {
                                    ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
                                    ctx.flush();
                                    ctx.close();

                                    log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.get(id));
                                    startTimes.remove(id);
                                }
                            }, 1000);
                        }
                    }
                }
            });
    Channel ch = b.bind("localhost", PORT).sync().channel();
    ch.closeFuture().sync();

Ofcourse, as mentioned by others in the thread, you cannot send Strings, you need to send a ByteBuf using Unpooled.copiedBuffer



回答2:

  1. See the comments about Channel, so you can reserve the Channel received in ChannelInboundHandlerAdapter.channelRead(ChannelHandlerContext ctx, Object msg) (msg is not released after returning automatically) or SimpleChannelInboundHandler.channelRead0(ChannelHandlerContext ctx, I msg) (it releases the received messages automatically after returning) for later use. Maybe you can refer to the example at the end, pass the channel to another ChannelHandler.

All I/O operations are asynchronous.

All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a ChannelFuture instance which will notify you when the requested I/O operation has succeeded, failed, or canceled.

public interface Channel extends AttributeMap, Comparable<Channel> {

    /**
     * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
     * This method will not request to actual flush, so be sure to call {@link #flush()}
     * once you want to request to flush all pending data to the actual transport.
     */
    ChannelFuture write(Object msg);

    /**
     * Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
     * This method will not request to actual flush, so be sure to call {@link #flush()}
     * once you want to request to flush all pending data to the actual transport.
     */
    ChannelFuture write(Object msg, ChannelPromise promise);

    /**
     * Request to flush all pending messages.
     */
    Channel flush();

    /**
     * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
     */
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    /**
     * Shortcut for call {@link #write(Object)} and {@link #flush()}.
     */
    ChannelFuture writeAndFlush(Object msg);
}
  1. There is no need to worry about this if you has added HttpResponseEncoder (it is a subclass of HttpObjectEncoder, which has a private filed private int state = ST_INIT; to remember whether to encode HTTP body data as chunked) into ChannelPipeline, the only thing to do is add a header 'transfer-encoding: chunked', e.g. HttpUtil.setTransferEncodingChunked(srcRes, true);.

```java

public class NettyToServerChat extends SimpleChannelInboundHandler<HttpObject> {
  private static final Logger LOGGER = LoggerFactory.getLogger(NettyToServerChat.class);
  public static final String CHANNEL_NAME = "NettyToServer";

  protected final ChannelHandlerContext ctxClientToNetty;
  /** Determines if the response supports keepalive */
  private boolean responseKeepalive = true;
  /** Determines if the response is chunked */
  private boolean responseChunked = false;

  public NettyToServerChat(ChannelHandlerContext ctxClientToNetty) {
    this.ctxClientToNetty = ctxClientToNetty;
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpResponse) {
      HttpResponse response = (HttpResponse) msg;

      HttpResponseStatus resStatus = response.status();
      //LOGGER.info("Status Line: {} {} {}", response.getProtocolVersion(), resStatus.code(), resStatus.reasonPhrase());

      if (!response.headers().isEmpty()) {
        for (CharSequence name : response.headers().names()) {
          for (CharSequence value : response.headers().getAll(name)) {
            //LOGGER.info("HEADER: {} = {}", name, value);
          }
        }
        //LOGGER.info("");
      }
      //response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);

      HttpResponse srcRes = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
      if (HttpUtil.isTransferEncodingChunked(response)) {
        responseChunked = true;
        HttpUtil.setTransferEncodingChunked(srcRes, true);
        ctxNettyToServer.channel().write(srcRes);
        //ctx.channel().pipeline().addAfter(CHANNEL_NAME, "ChunkedWrite",  new ChunkedWriteHandler());
      } else {
        ctxNettyToServer.channel().write(srcRes);
        //ctx.channel().pipeline().remove("ChunkedWrite");
      }
    }

    if (msg instanceof LastHttpContent) { // prioritize the subclass interface
      ctx.close();
      LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
      Thread.sleep(3000);
      LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());

      if(!responseChunked){
        HttpContent content = (HttpContent) msg;
        // https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/SimpleChannelInboundHandler.java
        // @see {@link SimpleChannelInboundHandler<I>#channelRead(ChannelHandlerContext, I)}
        ctxNettyToServer.writeAndFlush(content.retain()).addListener(ChannelFutureListener.CLOSE);
      }else{
        ctxNettyToServer.close();
      }
      LOGGER.debug("ctxNettyToServer.channel().isWritable() = {}", ctxNettyToServer.channel().isWritable());
    } else if (msg instanceof HttpContent) {
      HttpContent content = (HttpContent) msg;
      // We need to do a ReferenceCountUtil.retain() on the buffer to increase the reference count by 1
      ctxNettyToServer.write(content.retain());
    }
  }
}

```



标签: java netty