管道,复用和缓冲的无界(Pipelines, multiplexing, and unbounded

2019-07-20 09:17发布

(注:我使用的是.NET 4中, 没有 .NET 4.5,所以我不能使用TPL的DataflowBlock班)

TL; DR版本

最终,我只是在寻找一种方式来处理使用多线程的,保留它们在最终输出顺序的方式连续工作项目,而不需要一个无限输出缓冲。

动机

我已经现有代码,以提供用于处理其中一个I / O限制线程(“供应商”)是承担一切排队的数据块用于处理数据的多个块多线程机制。 这些数据块包含的工作项。

一个或多个线程(以下简称“处理器”)是负责在一个时间,它们处理和出队然后他们的下一个工作项目之前写入处理的数据到输出的队列出队一个工作项目。

最后一个I / O绑定线程(“消费者”)是负责从输出队列出队完成的工作项目,并将其写入到最终目的地。 这些工作项目(且必须)写在相同的顺序,他们入队。 我实现此使用并发优先级队列,其中每个项目的优先级由其源索引定义。

我用这个方案做了一些自定义的压缩在一个大的数据流,其中压缩本身也比较慢,但是未压缩数据的读取和压缩数据的写入是比较快的(虽然I / O限制)。

我在处理的64K的顺序相当大的块中的数据,所以管道的开销比较小。

我目前的解决方案是运作良好,但它涉及到很多的使用许多同步事件写入6年前自定义代码和设计似乎有点笨重; 因此,我已走上学术锻炼; Tibial,看它是否可以用更现代的.NET库被改写。

新的设计

我的新设计采用了BlockingCollection<>类,并在一定程度上基于此Microsoft文章 。

特别是,看标题为负载均衡使用多个生产者 。 我一直在使用这种方法试过了,所以我有其中的每一个从共享输入BlockingCollection需要的工作项目,并将它完成的项目,以自己的BlockingCollection输出队列的几个处理任务。

因为每个处理任务都有自己的输出队列,我试图用BlockingCollection.TakeFromAny()出队的第一个可用完成的工作项目。

复用器问题

到目前为止好,但现在来这里的问题。 微软的文章中指出:

该缺口是一个问题。 管道,显示图像的阶段,下一阶段的需要显示在顺序和没有该序列中的间隙的图像。 这是多路转换器的用武之地。使用TakeFromAny方法,多路复用等待来自两个滤波器级生产者队列输入。 当图像到达时,多路复用器查看是否图像的序列号是在预期的顺序下。 如果是,多路转换器将其传送到显示图像的阶段。 如果图像不是序列中的下一个,多路复用器保持在一个内部先行缓存器中的值,并重复该不具有前瞻值输入队列取操作。 该算法允许多路复用器放在一起在确保顺序不排序的值的方式从进入的生产者队列的输入。

好了,什么情况是,处理任务可以在几乎任何顺序生产成品的项目。 复用器负责以正确的顺序输出这些项目。

然而...

假设我们有1000个项目进行处理。 进一步设想,一些奇怪的原因,第一个项目需要更长的时间来处理所有其他项目相结合。

使用我目前的计划,多路复用器将保持阅读和缓冲项目从所有的处理输出队列,直到它找到它应该输出下一个。 由于其等待的是该项目(根据我的“想象一下,如果”上面)只会出现在所有其他工作项目已被处理之后,我将有效地缓冲在整个输入的所有工作项目!

数据量是远远太大,允许这种情况发生。 我需要能够从输出完成的工作项目停止处理任务时的输出队列已达到一定的规模最大(即它是一个有界输出队列),除非工作项目恰好是一个多路复用器正在等待。

而这也正是我变得有点卡住了。 我能想到的实际执行这个方法很多,但他们似乎都过于复杂,他们并不比我想更换更好的代码的程度!

什么是我的问题吗?

我的问题是:我要对这个正确的方式?

我本来以为这将是一个很好理解的问题,但我的研究才止跌回升,似乎忽略了,如果一个工作项目需要与所有其他工作项很长一段时间出现无限的缓冲问题的文章。

任何人都可以点我在描述一个合理的方式来实现这一目标的任何物品?

TL; DR版本

最终,我只是在寻找一种方式来处理使用多线程的,保留它们在最终输出顺序的方式连续工作项目,而不需要一个无限输出缓冲。

Answer 1:

在启动时创建,1000个项目池,说。 把它们存储在一个BlockingCollection - 一个“池队列”。

供应商从池队列中获得的物品,从文件加载它们,负载在序列号/不管,并将其提交给处理器线程池。

处理器做自己的东西,并发送输出到多路复用器。 多路复用器做它存储任何乱序项目,直到较早的项目已被处理的工作。

当一个项目已经通过任何多路复用器的输出被完全消耗掉,它们返回到池队列由供应商重新使用。

如果一个“缓慢的项目”确实需要处理巨额,乱序集合中的多路复用器将在其他池中的线程通过成长为“快速项目”滑动,但由于多路复用器是不实际喂养它的项目它的输出,池队列没有被补充。

当池清空,供应商将阻止它,将无法提供任何更多的项目。

“快速项目”留在处理池投入将得到处理,然后处理将停止除了“慢项目”。 供应商被阻断时,多路复用器具有[poolSize-1]其集合中的项目。 没有额外的内存被使用,没有CPU被浪费了,发生的唯一的事情就是“慢项目”的处理。

当“慢项目”终于完成,它就会输出到多路复用器。

多路复用器现在可以输出所有[poolSize]中所需的顺序的项目。 由于这些项目被消耗,池被重新填平,供应商,现在可以从池中获取项目,运行在再次阅读了文件中的排队项目到处理器池。

自动调节,不需要缓冲有限,无记忆失控。

编辑:我的意思是'不需要有界缓冲区的:)

此外,没有GC持率 - 因为项目被重新使用,他们并不需要GC'ing。



Answer 2:

我想你误解了文章。 据描述,它不具有无限的缓冲区,将有在每个队列的查找ahread缓冲值最多一个。 当你出列的值,这不是下一个,你保存它,然后等待只对不具有缓冲的值的队列。 (如果您有多个输入缓冲器,逻辑则要更加复杂,或者你需要的2个队列复用器树。)

如果你把这个与BlockingCollection已指定界能力S,你得到正是你想要的行为:如果一个制片人是太慢了,别人会暂停,直到慢线赶上。



Answer 3:

你有没有考虑过不使用手动生产者/消费者的缓冲,而是在.AsParallel().AsOrdered() PLINQ选择? 语义,这正是你想要的东西 - 并行处理,但在输出订购物品的序列。 你的代码可能看起来那样简单......

var orderedOutput = 
    ReadSequentialBlocks()
    .AsParallel()
    .AsOrdered()
    .Select(ProcessBlock)
foreach(var item in orderedOutput)
    Sink(item);

默认并行度是你的机器上的处理器数量,但是你可以调整它。 有一个自动输出缓冲器。 如果默认缓存消耗太多的资源,你可以将其关闭:

.WithMergeOptions(ParallelMergeOptions.NotBuffered)

不过,我肯定会给平淡朴实的版本拍摄第一 - 你永远不知道,它可能只是工作的优良开箱。 最后,如果你想自动复用的简单,但比零较大但非自动缓冲,你总是可以使用PLINQ查询,以填补一个固定大小的BlockingCollection<>这是阅读与消费枚举在另一个线程。



Answer 4:

跟进

为了完整起见,这里是我结束了与代码。 感谢Martin詹姆斯为他的答案,这为解决方案提供了依据。

我还没有与多路复用器完全满意(见ParallelWorkProcessor.multiplex() 它的工作原理,但它似乎有点klunky。

我用马丁詹姆斯的想法有关工作池,以防止多路复用器缓冲的无限增长,但是我代替工作池队列SemaphoreSlim(因为它提供了相同的功能,但它是一个有点简单使用,使用更少的资源)。

工人的任务写自己完成的项目,以并发优先级队列。 这使我很容易和有效地找到输出的下一个项目。

我用了一个从微软样品并发优先级队列 ,修改,以提供每当一个新项目入队是真实信号的自动复位事件。

这里的ParallelWorkProcessor类。 你向它提供三名代表使用它; 一个提供工作项目,一个处理一个工作项目,和一个输出完成的工作项目。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type.
    {
        public delegate T    Read();           // Called by only one thread.
        public delegate T    Process(T block); // Called simultaneously by multiple threads.
        public delegate void Write(T block);   // Called by only one thread.

        public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0)
        {
            _read    = read;
            _process = process;
            _write   = write;

            numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount;

            _workPool    = new SemaphoreSlim(numWorkers*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numWorkers);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _workers     = new Task[numWorkers];

            startWorkers();
            Task.Factory.StartNew(enqueueWorkItems);
            _multiplexor = Task.Factory.StartNew(multiplex);
        }

        private void startWorkers()
        {
            for (int i = 0; i < _workers.Length; ++i)
            {
                _workers[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void enqueueWorkItems()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null) // Signals end of input.
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special sentinel WorkItem .
                    break;
                }

                _workPool.Wait();
                _inputQueue.Add(new WorkItem(data, index++));
            }
        }

        private void multiplex()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (index != last)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item.

                while ((index != last) && _outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index); // This *must* be the case.
                        _workPool.Release();                    // Allow the enqueuer to queue another work item.
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _multiplexor.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem
        {
            public WorkItem(T data, int index)
            {
                Data  = data;
                Index = index;
            }

            public T   Data  { get; private set; }
            public int Index { get; private set; }
        }

        private readonly Task[] _workers;
        private readonly Task _multiplexor;
        private readonly SemaphoreSlim _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

这是我的测试代码:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            _maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup.

            var stopwatch = new Stopwatch();

            _numBlocks = _maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)(_maxBlocks-_numBlocks);
            Console.WriteLine("Supplied input: " + result[0]);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item!
            {
                Console.WriteLine("Delaying a call to process() for 5s for ID 10");
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
        private static int _maxBlocks;
    }
}


文章来源: Pipelines, multiplexing, and unbounded buffering