Netty : back propogate pressure in channel handler

2019-08-27 19:24发布

问题:

I am using netty to devlope application which will listen on specific port over TCP. Once bytes received, I have a pipeline with business logic to run on received bytes. This pipeline consist of multiple channel handlers like header decoder, application level fragmentation handler etc. At the end of pipeline, once message is processed, last handler in pipeline (say BufferWriter) will put processed message into a blocking queue. This blocking queue is acting like buffer, and BufferWriter is acting like producer. Note, this blocking queue is shared across all channels. So, all data received by application will be written to single queue. There is another scheduled executor service which will work as consumer of this buffer. This consumer is periodic task which run at certain time interval and take message from buffer and write to file.

The problem I ran into this is, when there is alot of incoming traffic, my consumer thread is not able to keep up the pace. Resulting in buffer queue getting full.

In netty, is there any way to slow down reading rate from socket, so that consumer will keep the pace up with producer? Something like, once buffer get full, netty will not read from socket, and whenever there is space in buffer, it will resume reading from socket.

Note that, in this case the sender are not written in java/netty. They are c program, which will form TCP connection with my server and will start sending data. I am assuming that, as I slow down my server socket reading rate, TCP will automaticlly slow down sender using congestion control policy(Slow start)

回答1:

To control reading there is a config called autoRead in Channel. You can set this to false:

ctx.channel().config().setAutoRead(false);

If you do so you need to manually trigger reading from the channel:

ctx.channel().read();

You can slow down the reading rate by initially setting autoRead to false, then having a buffer/count at the reading end (inbound) and reading only when the buffer is empty till it is full.

In WSO2 transport it is done as in here using a listener.