线程不Netty中UDP服务器同时执行(Threads not executing concurre

2019-07-29 07:30发布

我分析的代码创建了Netty的NioDatagramChannelFactory UDP服务器。 它创建一个线程池:

ExecutorService threadPool = Executors.newCachedThreadPool();

则数据报通道,pipelineFactory&自举:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

在pipelineFactory中,getPipeline()将自定义处理程序。

:就像它在说UDP消息的多线程处理

只有一个线程处理接收到的消息。 在日志中,线程名称显示为新的I / O报工人#1,如:

2012-04-20 09:20:51853的新I / O数据报工人#1 ' - ' 1个INFO [cemrshSNMPTrapsRequestHandler:42]的messageReceived | 处理:V1TRAP [reqestID = 0,...]

我阅读文档和此项目: UDP请求的地块与Netty的UDP服务器丢失

然后我根据这些条目修改一下代码。 现在,线程池与创建:

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

并拥有和ExecutionHandler的pipelineFactory:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

而getPipeline()将像描述的处理程序:

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

现在,我获取日志中的4个不同的线程名称。 他们将显示为池2线程1, 池2线程2,等...

例如:

2012-05-09 09:12:19589池-2-线程1 INFO [cemrshSNMPTrapsRequestHandler:46]的messageReceived | 处理:V1TRAP [reqestID = 0,...]

但他们并不进行并行处理。 的messageReceived下的处理()具有以完成在一个线程用于下一处理下一个消息。 我发送邮件的布赫来自不同客户机到服务器,而我得到的日志是不是隔行扫描。 我也尝试了Thread.sleep()内的messageReceived(),并印证了之前。

我缺少的东西吗? 有没有办法实现与Netty的一个真正的多线程UDP服务器? 我怎样才能得到不同的线程执行的messageReceived()同时?

Answer 1:

即跳出我的一件事是你第一次把你的执行处理程序正在进行中。 我相信,目的是使整个管道到“应用程序”处理程序应通过执行IO 解码的IO线程执行。

因此,我断言,你想先添加所有SNMPTRAP解码处理,然后,当你有一个实际SNMPTRAP,它被移交给这反过来陷阱传递给实际消费陷阱的执行处理程序做一些有用的。

@Override
public ChannelPipeline getPipeline() throws Exception {

    ChannelPipeline pipeline = Channels.pipeline(
         new SomethingSomethingDecoder(),
         new SNMPTrapDecoder(),
         executionHandler.
         snmpTrapConsumerHandler
    );
}

至少,这是它是如何在所示ExecutionHandler的javadoc,和上面的是我对它的解释。



Answer 2:

根据我的经验和我的UDP Netty的理解,这是正常的,只有一个处理UDP报文进行解码线程。 由于UDP是无会话,只有一个线程可以接收一个UDP端口上的数据,并对其进行解码。

一旦你已经解码的数据,并把它包到缓冲区或特定的Java对象,那么你可以把该对象为处理它的线程池(执行处理程序 - >您的业务处理程序)。 然后UDP端口上即将到来的新的数据一旦公布之前的解码后的数据到执行处理程序,然后进行解码。

当你听一个以上的端口数据,你可以创建NioDatagramChannelFactory时指定池的线程时才使用。 只有每个端口一个线程是有意义的。 即使你在构造函数中指定100名工人,只有一个,如果你配置一个UDP端口将被使用。



文章来源: Threads not executing concurrently in Netty UDP server