Disruptor example with 1 publisher and 4 parallel

2020-05-28 00:44发布

问题:

In this example https://stackoverflow.com/a/9980346/93647 and here Why is my disruptor example so slow? (at the end of the question) there is 1 publisher which publish items and 1 consumer.

But in my case consumer work is much more complicated and takes some time. So I want 4 consumers that process data in parallel.

So for example if producer produce numbers: 1,2,3,4,5,6,7,8,9,10,11..

I want consumer1 to catch 1,5,9,... consumer2 to catch 2,6,10,... consumer3 to catch 3,7,11,... consumer4 to catch 4,8,12... (well not exactly these numbers, the idea is that data should be processed in parallel, i don't care which certain number is processed on which consumer)

And remember this need to be done parallel because in real application consumer work is pretty expensive. I expect consumers to be executed in different threads to use power of multicore systems.

Of course I can just create 4 ringbuffers and attach 1 consumer to 1 ring-buffer. This way I can use original example. But I feel it wouldn't be correct. Likely it would be correct to create 1 publisher (1 ringbuffer) and 4 consumers - as this is what i need.

Adding link to a very simular question in google groups: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

So we have two options:

  • one ring many consumers (each consumer will "wake-up" on every addition, all consumer should have the same WaitStrategy)
  • many "one ring - one consumer" (each consumer will wake-up only on data that it should process. each consumer can have own WaitStrategy).

回答1:

EDIT: I forgot to mention the code is partially taken from the FAQ. I have no idea if this approach is better or worse than Frank's suggestion.

The project is severely under documented, that's a shame as it looks nice.
Anyway try the following snip (based on your first link) - tested on mono and seems to be OK:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}


回答2:

From the specs of the ring-buffer you will see that every consumer will try to process your ValueEvent. in your case you don't need that.

I solved it like this:

Add a field processed to your ValueEvent and when a consumer takes the event he test on that field, if it is already processed he moves on to the next field.

Not the most pretty way, but it's how the buffer works.