I'm trying to implement a UDP server with Netty. The idea is to bind only once (therefore creating only one Channel
). This Channel
is initialized with only one handler that dispatches processing of incoming datagrams among multiple threads via an ExecutorService
.
@Configuration
public class SpringConfig {
@Autowired
private Dispatcher dispatcher;
private String host;
private int port;
@Bean
public Bootstrap bootstrap() throws Exception {
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(dispatcher);
ChannelFuture future = bootstrap.bind(host, port).await();
if(!future.isSuccess())
throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());
return bootstrap;
}
}
@Component
@Sharable
public class Dispatcher extends ChannelInboundHandlerAdapter implements InitializingBean {
private int workerThreads;
private ExecutorService executorService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DatagramPacket packet = (DatagramPacket) msg;
final Channel channel = ctx.channel();
executorService.execute(new Runnable() {
@Override
public void run() {
//Process the packet and produce a response packet (below)
DatagramPacket responsePacket = ...;
ChannelFuture future;
try {
future = channel.writeAndFlush(responsePacket).await();
} catch (InterruptedException e) {
return;
}
if(!future.isSuccess())
log.warn("Failed to write response packet.");
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
executorService = Executors.newFixedThreadPool(workerThreads);
}
}
I have the following questions:
- Should the
DatagramPacket
received by thechannelRead
method of theDispatcher
class be duplicated before being used by the worker thread? I wonder if this packet is destroyed after thechannelRead
method returns, even if a reference is kept by the worker thread. - Is it safe to share the
Channel
among all the worker threads and let them callwriteAndFlush
concurrently?
Thanks!
Nope. If you need the object to live longer you either turn it into something else or use
ReferenceCountUtil.retain(datagram)
and thenReferenceCountUtil.release(datagram)
once you're done with it. You also shouldn't be doingawait()
at the executor service as well, you should register a handler for whatever happens.Yes, channel objects are thread safe and they can be called from many different threads.