Proper way to pool client channels in netty?

2020-08-01 08:35发布

问题:

I'm getting a java.nio.channels.NotYetConnectedException in the following code because I'm trying to write to a channel that is not yet open.

Essentially what I have is a channel pool in which I grab a channel to write to if one is free, and I create a new channel if one is not available. My problem is that when I create a new channel, the channel is not ready for writing when I call connect, and I don't want to wait for the connection to open before returning because I don't want to block the thread. What's the best way to do this? Also, is my logic for retrieving/returning channels valid? See code below.

I have a simple connection pool like the following:

private static class ChannelPool {
    private final ClientBootstrap cb;
    private Set<Channel> activeChannels = new HashSet<Channel>();
    private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
    public ChannelPool() {
        ChannelFactory clientFactory =
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());
        cb = new ClientBootstrap(clientFactory);
        cb.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                        new HttpRequestEncoder(),
                        new HttpResponseDecoder(),
                        new ResponseHandler());
            }
        });
    }

    private Channel newChannel() {
        ChannelFuture cf;
        synchronized (cb) {
            cf = cb.connect(new InetSocketAddress("localhost", 18080));
        }
        final Channel ret = cf.getChannel();
        ret.getCloseFuture().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture arg0) throws Exception {
                System.out.println("channel closed?");
                synchronized (activeChannels) {
                    activeChannels.remove(ret);
                }
            }
        });
        synchronized (activeChannels) {
            activeChannels.add(ret);
        }
        System.out.println("returning new channel");
        return ret;
    }

    public Channel getFreeChannel() {
        synchronized (freeChannels) {
            while (!freeChannels.isEmpty()) {
                Channel ch = freeChannels.pollFirst();
                if (ch.isOpen()) {
                    return ch;
                }
            }
        }
        return newChannel();
    }

    public void returnChannel(Channel ch) {
        synchronized (freeChannels) {
            freeChannels.addLast(ch);
        }
    }
}

I'm trying to use this inside a handler as follows:

private static class RequestHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        final HttpRequest request = (HttpRequest) e.getMessage();
        Channel proxyChannel = pool.getFreeChannel();
        proxyToClient.put(proxyChannel, e.getChannel());
        proxyChannel.write(request);
    }
}

回答1:

Instead of adding the new channel to activeChannels immediately after bootstrap.connect(..), you have to add a listener to the ChannelFuture which was returned by bootstrap.connect(..), and add the channel to activeChannels in the added listener. That way, getFreeChannel() will never get the channel that is not connected yet.

Because it is likely that activeChannels is empty even if you called newChannel() (newChannel() will return even before connection is established), you have to decide what to do in such a case. If I were you, I would change the return type of getFreeChannel() from Channel to ChannelFuture so that the caller gets notified when the free channel is ready.



标签: java netty