What's the best way to asynchronously handle l

2019-06-22 04:57发布

问题:

One EventHandler(DatabaseConsumer) of the Disruptor calls stored procedures in database, which is so slow that it blocks the Disruptor for some time.

Since I need the Disruptor keep running without blocking. I am thinking adding an extra queue so that EventHandler could serve as Producer and another new-created thread could serve as Consumer to handle database's work, which could be asynchronous without affecting the Disruptor

Here is some constrain:

  1. The object that Disruptor passed to the EventHandler is around 30KB and the number of this object is about 400k. In theory, the total size of the objects that needs to be handled is around 30KBX400K =12GB. So the extra queue should be enough for them.
  2. Since performance matters, GC pause should be avoided.
  3. The heap size of the Java program is only 2GB.

I'm thinking text file as a option. EventHandler(Producer) writes the object to the file and Consumer reads from them and call stored procedure. The problem is how to handle the situation that it reach to the end of the file and how to know the new coming line.

Anyone who has solve this situation before? Any advice?

回答1:

The short answer is size your disruptor to cope with the size of your bursts not your entire volume, bare in mind the disruptor can just contain a reference to the 30kb object, the entire object does not need to be in the ring buffer.

With any form of buffering before your database will require the memory for buffering the disruptor offers you the option of back pressure on the rest of the system when the database has fallen too far behind. That is to say you can slow the inputs to the disruptor down.

The other option for spooling to files is to look at Java Chronicle which uses memory mapped files to persist things to disk.

The much more complicated answer is take advantage of the batching effects of the disruptor so that your DB can catch up. I.e. using a EventHandler which collects events a batch of events together and submits them to the database as one unit. This practice allows the EventHandler to become more efficient as things back up thus increasing throughput.



回答2:

Short answer: don't use disruptor. Use a distributed MQ with retransmission support.

Long answer: If you have fast producers with slow consumers you will need some sort of retransmission mechanism. I don't think you can escape from that unless you can tolerate nasty blocks (i.e. huge latencies) in your system. That's when distributed MQs (Messaging Queues) come to play. Disruptor is not a distributed MQ, but you could try to implement something similar. The idea is:

  • All messages are sequenced and processed in order by the consumer
  • If the queue gets full, messages are dropped
  • If the consumer detects a message gap it will request a retransmission of the lost messages, buffering the future messages until it receives the gap

With that approach the consumer can be as slow as it wants because it can always request the retransmission of any message it lost at any time. What we are missing here is the retransmission entity. In a distributed MQ that will be a separate and independent node persisting all messages to disk, so it can replay back any message to any other node at any time. Since you are not talking about an MQ here, but about disruptor, then you will have to somehow implement that retransmission mechanism yourself on another thread. This is a very interesting problem without an easy answer or recipe. I would use multiple disruptor queues so your consumer could do something like:

  • Read from the main channel (i.e. main disruptor queue)
  • If you detect a sequence gap, go to another disruptor queue connected to the replayer thread. You will actually need two queues there, one to request the missing messages and another one to receive them.
  • The replayer thread would have another disruptor queue from where it is receiving all messages and persisting it to disk.

You are left to make sure your replayer thread can write messages fast enough to disk. If it cannot then there is no escape besides blocking the whole system. Fortunately disk i/o can be done very fast if you know what you are doing.

You can forget all I said if you can just afford to block the producers if the consumers are slow. But if the producers are getting messages from the network, blocking them will eventually give you packet drops (UDP) and probably an IOException (TCP).

As you can see this is a very interesting question with a very complicated answer. At Coral Blocks we have experience developing distributed MQs like that on top of CoralReactor. You can take a look in some of the articles we have on our website.