Netty4 confusion with simple example

2019-08-30 23:58发布

I am new to netty and have a strange problem with an easy example.

In the following code when I create only one client it works as supposed to, but when I create many clients, they never get any response from the server, although the server accepts the connections and gets the requests.

Server classes:

1) MyServer.java

public final class MyServer {

    private final EventLoopGroup _bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup _workerGroup = new NioEventLoopGroup();
    private final int _port;
    private Channel _channel;

    public MyServer(int port)
    {
        _port = port;
    }

    public ChannelFuture start()
    {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(_bossGroup, _workerGroup)
            .channel( NioServerSocketChannel.class )
            .childHandler( new MyServerInitializer(this) );

        InetSocketAddress address = new InetSocketAddress(_port);
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        _channel = future.channel();

        return( future );
    }

    public void stop()
    {
        if( _channel != null )
        {
            _channel.close();
        }
        _bossGroup.shutdownGracefully();
        _workerGroup.shutdownGracefully();
    }

    /**
     * Start the server.
     */
    public static void main(String[] args)
    {
        final MyServer SRV = new MyServer(8080);
        try
        {
            System.out.println("Starting service on port " + port);
            ChannelFuture future = SRV.start();

            Runtime.getRuntime().addShutdownHook( new Thread() {
                @Override
                public void run()
                {
                    SRV.stop();
                }
            });

            System.out.println("Waiting for connections...");
            future.channel().closeFuture().syncUninterruptibly();
        }
        catch(Exception ex)
        {
            ex.printStackTrace();
        }
        finally
        {
            SRV.stop();
        }
    }
}

2) MyServerInitializer.java

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    private final MyServer _server;

    public MyServerInitializer(final MyServer instance)
    {
        _server = instance;
    }

    @Override
    protected void initChannel(SocketChannel ch)
        throws Exception
    {
        ChannelPipeline pipeline = ch.pipeline();

        // Decoders
        pipeline.addLast( new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()) );
        pipeline.addLast( new StringDecoder(CharsetUtil.UTF_8) );

        // Encoder
        pipeline.addLast( new StringEncoder(CharsetUtil.UTF_8) );

        // and then handler for business logic.
        pipeline.addLast( new MyServerHandler(_server) );
    }
}

3) MyServerHandler.java

public class MyServerHandler extends SimpleChannelInboundHandler<String> {

    public static final String[] VALID_COMMANDS = new String[] {
        "status",
        "shutdown"
    };
    public static final String STATUS_CMD = VALID_COMMANDS[0];
    public static final String SHUTDOWN_CMD = VALID_COMMANDS[1];

    private final MyServer _server;

    public MyServerHandler(final MyServer instance)
    {
        _server = instance;
    }

    private String clientName(ChannelHandlerContext ctx)
    {
        return( ctx.channel().remoteAddress().toString() );
    }

    @Override
    public boolean acceptInboundMessage(Object msg)
        throws Exception
    {
        return( msg!=null &&
                msg instanceof String &&
                Arrays.asList(VALID_COMMANDS).contains( ((String)msg).trim().toLowerCase() )
        );
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx)
            throws Exception
    {
        System.out.println(ctx.name() + " client connected " + clientName(ctx));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception
    {
        System.out.println(ctx.name() + " exceptionCaught for client " + clientName(ctx));
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg)
        throws Exception
    {
        System.out.println((ctx.name() + " message received from client " + clientName(ctx) + " '" + msg + "'");

        if( STATUS_CMD.equals(msg) )
        {
            ctx.writeAndFlush("It's alive!" + "\n");
            ctx.close();
        }
        else if( SHUTDOWN_CMD.equals(msg) )
        {
            ctx.close();
            _server.stop();
        }
        else
        {
            // impossible!
        }
    }
}

Client classes:

1) MyClient.java

public class MyClient {

    private final EventLoopGroup _group = new NioEventLoopGroup();
    private final String _host;
    private final int _port;
    private Channel _channel;

    public MyClient(String host, int port)
    {
        _host = host;
        _port = port;
    }

    public void start()
        throws Exception
    {
        Bootstrap b = new Bootstrap();
        b.group(_group)
            .channel(NioSocketChannel.class)
            .remoteAddress(new InetSocketAddress(_host, _port))
            .handler(new MyClientInitializer());

        _channel = b.connect().sync().channel();
    }

    public void stop()
    {
        if( _channel != null )
        {
            _channel.close();
        }
        _group.shutdownGracefully();
    }

    public ChannelFuture sendCmd(String cmd)
        throws Exception
    {
        return( _channel.writeAndFlush(cmd + "\n").sync() );
    }

    /**
     * Test client!
     */
    public static void main(String[] args)
    {
        // if 1 client ok, if 1000 clients problem...
        for(int i=0; i<1000; i++)
        {
            final MyClient CLIENT = new MyClient("localhost", 8080);
            try
            {
                CLIENT.start();
                CLIENT.sendCmd(MyServerHandler.STATUS_CMD);
            }
            catch(ConnectException ce)
            {
                System.err.println("Failed to connect to server; " + ce.getMessage());
            }
            catch(Exception ex)
            {
                System.err.println("Client main() failed: " + ex.getMessage());
                ex.printStackTrace();
            }
            finally
            {
                CLIENT.stop();
            }
        }
    }
}

2) MyClientInitializer.java

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch)
        throws Exception
    {
        ChannelPipeline pipeline = ch.pipeline();

        // Decoders
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

        // Encoder
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

        // and then business logic.
        pipeline.addLast(new MyClientHandler());
    }
}

3) MyClientHandler.java

public class MyClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String input)
        throws Exception
    {
        System.out.println("Message received from server: " + input);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}

标签: java netty
1条回答
啃猪蹄的小仙女
2楼-- · 2019-08-31 01:00

Your client starts, connects, sends a message, and then immediately shuts down without waiting for a message from the server. You need some mechanism to control the lifetime of your client, but to get it working you could do something trivial like this:

Add a waitForClose method to MyClient:

public void waitForClose()
{
    _channel.closeFuture().awaitUninterruptibly();
}

And then call it after you send your message:

CLIENT.sendCmd(MyServerHandler.STATUS_CMD);
CLIENT.waitForClose(); // <-- Add this line
查看更多
登录 后发表回答