UnsupportedOperationException when sending message

2019-07-14 09:01发布

问题:

I am using Netty 5.0.

I have a complementary client bootstrap for which I took the SecureChatClient.java example from netty github. Wenn I send messages from the client bootstrap to the server it works perfectly fine. When I try to send messages from the server bootstrap to the client (after successfully initiating a connection/channel through the client first) I get a java.lang.UnsupportedOperationException without any further information on it. Sending messages from server to client is done via code above.

Is a serverbootstrap for receiving only?

Is a serverbootstrap not meant to be able to write messages back to the client as shown above? By that I mean, messages can enter a ChannelPipeline from a socket up through the ChannelHandlers, but only the ChannelHandlers are supposed to be writing responses back down the ChannelPipeline and out the socket. So in a ServerBootstrap a user is not meant to be able to send messages down the ChannelPipeline from outside the Pipeline. (Hope that makes sense)

Or am I simply missing something?

My code follows:

    // Ports.
    int serverPort = 8080;

    EventLoopGroup bossGroup    = new NioEventLoopGroup();
    EventLoopGroup workerGroup  = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast("MyMessageHandler", new MyMessageHandler());
             }
         })
         .option(ChannelOption.SO_BACKLOG, 128)
         .childOption(ChannelOption.SO_KEEPALIVE, true);

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(serverPort).sync();
        Channel ch = f.channel();

        System.out.println("Server: Running!");

      // Read commands from the stdin.
      ChannelFuture lastWriteFuture = null;
      BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
      while(true)
      {
          String line = in.readLine();
          if (line == null) break;

          ByteBuf getOut = buffer(64);
          getOut.writeBytes(line.getBytes());

          // Sends the received line to the server.
          lastWriteFuture = ch.writeAndFlush(getOut);

          lastWriteFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture cf) throws Exception {
                    if(cf.isSuccess()) {
                        System.out.println("CFListener: SUCCESS! YEAH! HELL! YEAH!");
                    } else {
                        System.out.println("CFListener: failure! FAILure! FAILURE!");
                        System.out.println(cf.cause());
                    }
                }
            });

      }

               // Wait until all messages are flushed before closing the channel.
      if (lastWriteFuture != null) {
          lastWriteFuture.sync();
      }


        // Wait until the server socket is closed.
        // In this example, this does not happen, but you can do that to gracefully
        // shut down your server.
        f.channel().closeFuture().sync();
    } catch (InterruptedException | UnsupportedOperationException e) {
        e.printStackTrace();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

I started using the following example: https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/securechat

My problem is that I get the following exception when calling ch.writeAndFlush:

java.lang.UnsupportedOperationException
at io.netty.channel.socket.nio.NioServerSocketChannel.filterOutboundMessage(NioServerSocketChannel.java:184)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:784)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1278)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:158)
at io.netty.channel.DefaultChannelHandlerInvoker$WriteTask.run(DefaultChannelHandlerInvoker.java:440)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:328)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)

回答1:

You cannot write to a ServerChannel, you can only connect to normal channels. Your call to writeAndFlush is failing for this reason.

To send a message to every client, you should store the channel of every client inside a ChannelGroup and invoke writeAndFlush() on that.

A quick way to do this is adding another handler to your ServerBootstrap that puts the incoming connections inside the ChannelGroup, a quick implementation of this would be this:

// In your main:
ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

// In your ChannelInitializer<SocketChannel>
ch.pipeline().addLast("grouper", new GlobalSendHandler());

// New class:
public class MyHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
         allChannels.add(ctx.channel());
         super.channelActive(ctx);
     }
 }

Then we can call the following to send a message to every connection, this returns a ChannelGroupFuture instead of a normal ChannelFuture:

allChannels.writeAndFlush(getOut);

Your total code would look like this with the fixes from above:

// Ports.
int serverPort = 8080;

ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

EventLoopGroup bossGroup    = new NioEventLoopGroup();
EventLoopGroup workerGroup  = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast("MyMessageHandler", new MyMessageHandler());
             ch.pipeline().addLast("grouper", new GlobalSendHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)
     .childOption(ChannelOption.SO_KEEPALIVE, true);

    // Bind and start to accept incoming connections.
    ChannelFuture f = b.bind(serverPort).sync();
    Channel ch = f.channel();

    System.out.println("Server: Running!");

  // Read commands from the stdin.
  ChannelGroupFuture lastWriteFuture = null;
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  while(true)
  {
      String line = in.readLine();
      if (line == null) break;

      ByteBuf getOut = buffer(64);
      getOut.writeBytes(line.getBytes());

      // Sends the received line to the server.
      lastWriteFuture = allChannels.writeAndFlush(getOut);

      lastWriteFuture.addListener(new ChannelGroupFutureListener() {
            @Override
            public void operationComplete(ChannelGroupFuture cf) throws Exception {
                if(cf.isSuccess()) {
                    System.out.println("CFListener: SUCCESS! YEAH! HELL! YEAH!");
                } else {
                    System.out.println("CFListener: failure! FAILure! FAILURE!");
                    System.out.println(cf.cause());
                }
            }
        });

  }

           // Wait until all messages are flushed before closing the channel.
  if (lastWriteFuture != null) {
      lastWriteFuture.sync();
  }


    // Wait until the server socket is closed.
    // In this example, this does not happen, but you can do that to gracefully
    // shut down your server.
    f.channel().closeFuture().sync();
} catch (InterruptedException | UnsupportedOperationException e) {
    e.printStackTrace();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}


回答2:

I think Netty Server has no decoder, encoder. if you want to send String data,

serverBootstrap.group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline channelPipeline = channel.pipeline();
        channelPipeline.addLast("String Encoder", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("String Decoder", new StringDecoder(CharsetUtil.UTF_8));
    }
});

Add your server's Initializer!



标签: java netty