I am new to netty and have a strange problem with an easy example.
In the following code when I create only one client it works as supposed to, but when I create many clients, they never get any response from the server, although the server accepts the connections and gets the requests.
Server classes:
1) MyServer.java
public final class MyServer {
private final EventLoopGroup _bossGroup = new NioEventLoopGroup();
private final EventLoopGroup _workerGroup = new NioEventLoopGroup();
private final int _port;
private Channel _channel;
public MyServer(int port)
{
_port = port;
}
public ChannelFuture start()
{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(_bossGroup, _workerGroup)
.channel( NioServerSocketChannel.class )
.childHandler( new MyServerInitializer(this) );
InetSocketAddress address = new InetSocketAddress(_port);
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
_channel = future.channel();
return( future );
}
public void stop()
{
if( _channel != null )
{
_channel.close();
}
_bossGroup.shutdownGracefully();
_workerGroup.shutdownGracefully();
}
/**
* Start the server.
*/
public static void main(String[] args)
{
final MyServer SRV = new MyServer(8080);
try
{
System.out.println("Starting service on port " + port);
ChannelFuture future = SRV.start();
Runtime.getRuntime().addShutdownHook( new Thread() {
@Override
public void run()
{
SRV.stop();
}
});
System.out.println("Waiting for connections...");
future.channel().closeFuture().syncUninterruptibly();
}
catch(Exception ex)
{
ex.printStackTrace();
}
finally
{
SRV.stop();
}
}
}
2) MyServerInitializer.java
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
private final MyServer _server;
public MyServerInitializer(final MyServer instance)
{
_server = instance;
}
@Override
protected void initChannel(SocketChannel ch)
throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
// Decoders
pipeline.addLast( new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()) );
pipeline.addLast( new StringDecoder(CharsetUtil.UTF_8) );
// Encoder
pipeline.addLast( new StringEncoder(CharsetUtil.UTF_8) );
// and then handler for business logic.
pipeline.addLast( new MyServerHandler(_server) );
}
}
3) MyServerHandler.java
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
public static final String[] VALID_COMMANDS = new String[] {
"status",
"shutdown"
};
public static final String STATUS_CMD = VALID_COMMANDS[0];
public static final String SHUTDOWN_CMD = VALID_COMMANDS[1];
private final MyServer _server;
public MyServerHandler(final MyServer instance)
{
_server = instance;
}
private String clientName(ChannelHandlerContext ctx)
{
return( ctx.channel().remoteAddress().toString() );
}
@Override
public boolean acceptInboundMessage(Object msg)
throws Exception
{
return( msg!=null &&
msg instanceof String &&
Arrays.asList(VALID_COMMANDS).contains( ((String)msg).trim().toLowerCase() )
);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)
throws Exception
{
System.out.println(ctx.name() + " client connected " + clientName(ctx));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception
{
System.out.println(ctx.name() + " exceptionCaught for client " + clientName(ctx));
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception
{
System.out.println((ctx.name() + " message received from client " + clientName(ctx) + " '" + msg + "'");
if( STATUS_CMD.equals(msg) )
{
ctx.writeAndFlush("It's alive!" + "\n");
ctx.close();
}
else if( SHUTDOWN_CMD.equals(msg) )
{
ctx.close();
_server.stop();
}
else
{
// impossible!
}
}
}
Client classes:
1) MyClient.java
public class MyClient {
private final EventLoopGroup _group = new NioEventLoopGroup();
private final String _host;
private final int _port;
private Channel _channel;
public MyClient(String host, int port)
{
_host = host;
_port = port;
}
public void start()
throws Exception
{
Bootstrap b = new Bootstrap();
b.group(_group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(_host, _port))
.handler(new MyClientInitializer());
_channel = b.connect().sync().channel();
}
public void stop()
{
if( _channel != null )
{
_channel.close();
}
_group.shutdownGracefully();
}
public ChannelFuture sendCmd(String cmd)
throws Exception
{
return( _channel.writeAndFlush(cmd + "\n").sync() );
}
/**
* Test client!
*/
public static void main(String[] args)
{
// if 1 client ok, if 1000 clients problem...
for(int i=0; i<1000; i++)
{
final MyClient CLIENT = new MyClient("localhost", 8080);
try
{
CLIENT.start();
CLIENT.sendCmd(MyServerHandler.STATUS_CMD);
}
catch(ConnectException ce)
{
System.err.println("Failed to connect to server; " + ce.getMessage());
}
catch(Exception ex)
{
System.err.println("Client main() failed: " + ex.getMessage());
ex.printStackTrace();
}
finally
{
CLIENT.stop();
}
}
}
}
2) MyClientInitializer.java
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch)
throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
// Decoders
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// Encoder
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// and then business logic.
pipeline.addLast(new MyClientHandler());
}
}
3) MyClientHandler.java
public class MyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String input)
throws Exception
{
System.out.println("Message received from server: " + input);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception
{
cause.printStackTrace();
ctx.close();
}
}
Your client starts, connects, sends a message, and then immediately shuts down without waiting for a message from the server. You need some mechanism to control the lifetime of your client, but to get it working you could do something trivial like this:
Add a
waitForClose
method toMyClient
:And then call it after you send your message: