Multi threaded UDP server with Netty

2020-07-22 17:51发布

问题:

I'm trying to implement a UDP server with Netty. The idea is to bind only once (therefore creating only one Channel). This Channel is initialized with only one handler that dispatches processing of incoming datagrams among multiple threads via an ExecutorService.

@Configuration
public class SpringConfig {

    @Autowired
    private Dispatcher dispatcher;

    private String host;

    private int port;

    @Bean
    public Bootstrap bootstrap() throws Exception {
        Bootstrap bootstrap = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(dispatcher);

        ChannelFuture future = bootstrap.bind(host, port).await();
        if(!future.isSuccess())
            throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());

        return bootstrap;
    }
}

@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {

    private int workerThreads;

    private ExecutorService executorService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DatagramPacket packet = (DatagramPacket) msg;

        final Channel channel = ctx.channel();

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //Process the packet and produce a response packet (below)              
                DatagramPacket responsePacket = ...;

                ChannelFuture future;
                try {
                    future = channel.writeAndFlush(responsePacket).await();
                } catch (InterruptedException e) {
                    return;
                }
                if(!future.isSuccess())
                    log.warn("Failed to write response packet.");
            }
        });
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        executorService = Executors.newFixedThreadPool(workerThreads);
    }
}

I have the following questions:

  1. Should the DatagramPacket received by the channelRead method of the Dispatcher class be duplicated before being used by the worker thread? I wonder if this packet is destroyed after the channelRead method returns, even if a reference is kept by the worker thread.
  2. Is it safe to share the Channel among all the worker threads and let them call writeAndFlush concurrently?

Thanks!

回答1:

  1. Nope. If you need the object to live longer you either turn it into something else or use ReferenceCountUtil.retain(datagram) and then ReferenceCountUtil.release(datagram) once you're done with it. You also shouldn't be doing await() at the executor service as well, you should register a handler for whatever happens.

  2. Yes, channel objects are thread safe and they can be called from many different threads.