用1个出版商和4名平行消费者破坏剂例如(Disruptor example with 1 publi

2019-08-02 15:13发布

在这个例子中https://stackoverflow.com/a/9980346/93647这里为什么我的破坏者例子这么慢? (在这个问题结束)有1个出版商其发布项目和1名消费者。

但在我的情况下,消费者的工作更加复杂,需要一定的时间。 所以,我想4名消费者,在并行处理数据。

因此,举例来说,如果生产商生产的数字:1,2,3,4,5,6,7,8,9,10,11 ..

我想consumer1赶上1,5,9,... consumer2赶上2,6,10,... consumer3赶上3,7,11,... consumer4赶上4,8,12 ...(以及不完全是这些数字,这个想法是,数据应并行处理,我都不在乎某些号码又出现在消费者处理)

请记住,因为在实际应用中消费的工作是相当昂贵的这个需要做并行。 我希望消费者能够在不同的线程中执行使用多核系统的动力。

当然,我可以只创建4个ringbuffers并附加1个消费1环形缓冲区。 这样我可以使用原来的例子。 但我觉得这是不正确。 有可能是正确的创建1个出版商(1个ringbuffer)和4名消费者 - 因为这正是我需要的。

添加链接在谷歌组非常simular问题: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

因此,我们有两个选择:

  • 一个环形许多消费者(每个消费者将“唤醒”每此外,所有的消费者应该有相同的WaitStrategy)
  • 许多“一对 - 一个消费者”(。每一个消费者将被唤醒,只对数据,它应该处理每一个消费者可以有自己的WaitStrategy)。

Answer 1:

EDIT: I forgot to mention the code is partially taken from the FAQ. I have no idea if this approach is better or worse than Frank's suggestion.

The project is severely under documented, that's a shame as it looks nice.
Anyway try the following snip (based on your first link) - tested on mono and seems to be OK:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}


Answer 2:

从环形缓冲区的规格,你会看到每一个消费者会尝试处理您的ValueEvent 。 你的情况,你不需要这个。

我解决了它是这样的:

添加处理您现场ValueEvent ,当消费者需要他在那场测试的情况下,如果是已经处理,他移动到下一个字段。

这不是最漂亮的方式,但它是如何工作的缓冲区。



文章来源: Disruptor example with 1 publisher and 4 parallel consumers