在这个例子中https://stackoverflow.com/a/9980346/93647这里为什么我的破坏者例子这么慢? (在这个问题结束)有1个出版商其发布项目和1名消费者。
但在我的情况下,消费者的工作更加复杂,需要一定的时间。 所以,我想4名消费者,在并行处理数据。
因此,举例来说,如果生产商生产的数字:1,2,3,4,5,6,7,8,9,10,11 ..
我想consumer1赶上1,5,9,... consumer2赶上2,6,10,... consumer3赶上3,7,11,... consumer4赶上4,8,12 ...(以及不完全是这些数字,这个想法是,数据应并行处理,我都不在乎某些号码又出现在消费者处理)
请记住,因为在实际应用中消费的工作是相当昂贵的这个需要做并行。 我希望消费者能够在不同的线程中执行使用多核系统的动力。
当然,我可以只创建4个ringbuffers并附加1个消费1环形缓冲区。 这样我可以使用原来的例子。 但我觉得这是不正确。 有可能是正确的创建1个出版商(1个ringbuffer)和4名消费者 - 因为这正是我需要的。
添加链接在谷歌组非常simular问题: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ
因此,我们有两个选择:
- 一个环形许多消费者(每个消费者将“唤醒”每此外,所有的消费者应该有相同的WaitStrategy)
- 许多“一对 - 一个消费者”(。每一个消费者将被唤醒,只对数据,它应该处理每一个消费者可以有自己的WaitStrategy)。
EDIT: I forgot to mention the code is partially taken from the FAQ. I have no idea if this approach is better or worse than Frank's suggestion.
The project is severely under documented, that's a shame as it looks nice.
Anyway try the following snip (based on your first link) - tested on mono and seems to be OK:
using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
namespace DisruptorTest
{
public sealed class ValueEntry
{
public long Value { get; set; }
}
public class MyHandler : IEventHandler<ValueEntry>
{
private static int _consumers = 0;
private readonly int _ordinal;
public MyHandler()
{
this._ordinal = _consumers++;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
if ((sequence % _consumers) == _ordinal)
Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
else
Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);
}
}
class Program
{
private static readonly Random _random = new Random();
private const int SIZE = 16; // Must be multiple of 2
private const int WORKERS = 4;
static void Main()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
for (int i=0; i < WORKERS; i++)
disruptor.HandleEventsWith(new MyHandler());
var ringBuffer = disruptor.Start();
while (true)
{
long sequenceNo = ringBuffer.Next();
ringBuffer[sequenceNo].Value = _random.Next();;
ringBuffer.Publish(sequenceNo);
Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
Console.ReadKey();
}
}
}
}
从环形缓冲区的规格,你会看到每一个消费者会尝试处理您的ValueEvent
。 你的情况,你不需要这个。
我解决了它是这样的:
添加处理您现场ValueEvent
,当消费者需要他在那场测试的情况下,如果是已经处理,他移动到下一个字段。
这不是最漂亮的方式,但它是如何工作的缓冲区。