How can I read messages from a queue in parallel?

2020-02-29 11:37发布

Situation

We have one message queue. We would like to process messages in parallel and limit the number of simultaneously processed messages.

Our trial code below does process messages in parallel, but it only starts a new batch of processes when the previous one is finished. We would like to restart Tasks as they finish.

In other words: The maximum number of Tasks should always be active as long as the message queue is not empty.

Trial code

static string queue = @".\Private$\concurrenttest";

private static void Process(CancellationToken token)
{
    Task.Factory.StartNew(async () =>
    {
        while (true)
        {
            IEnumerable<Task> consumerTasks = ConsumerTasks();
            await Task.WhenAll(consumerTasks);

            await PeekAsync(new MessageQueue(queue));
        }
    });
}

private static IEnumerable<Task> ConsumerTasks()
{
    for (int i = 0; i < 15; i++)
    {
        Command1 message;
        try
        {
            MessageQueue msMq = new MessageQueue(queue);
            msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
            Message msg = msMq.Receive();
            message = (Command1)msg.Body;
        }
        catch (MessageQueueException mqex)
        {
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                yield break; // nothing in queue
            else throw;
        }
        yield return Task.Run(() =>
        {
            Console.WriteLine("id: " + message.id + ", name: " + message.name);
            Thread.Sleep(1000);
        });
    }
}

private static Task<Message> PeekAsync(MessageQueue msMq)
{
    return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}

5条回答
混吃等死
2楼-- · 2020-02-29 12:13

EDIT

I spent a lot of time thinking about reliability of the pump - specifically if a message is received from the MessageQueue, cancellation becomes tricky - so I provided two ways to terminate the queue:

  • Signaling the CancellationToken stops the pipeline as quickly as possible and will likely result in dropped messages.
  • Calling MessagePump.Stop() terminates the pump but allows all messages which have already been taken from the queue to be fully processed before the MessagePump.Completion task transitions to RanToCompletion.

The solution uses TPL Dataflow (NuGet: Microsoft.Tpl.Dataflow).

Full implementation:

using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace StackOverflow.Q34437298
{
    /// <summary>
    /// Pumps the message queue and processes messages in parallel.
    /// </summary>
    public sealed class MessagePump
    {
        /// <summary>
        /// Creates a <see cref="MessagePump"/> and immediately starts pumping.
        /// </summary>
        public static MessagePump Run(
            MessageQueue messageQueue,
            Func<Message, Task> processMessage,
            int maxDegreeOfParallelism,
            CancellationToken ct = default(CancellationToken))
        {
            if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue));
            if (processMessage == null) throw new ArgumentNullException(nameof(processMessage));
            if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));

            ct.ThrowIfCancellationRequested();

            return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct);
        }

        private readonly TaskCompletionSource<bool> _stop = new TaskCompletionSource<bool>();

        /// <summary>
        /// <see cref="Task"/> which completes when this instance
        /// stops due to a <see cref="Stop"/> or cancellation request.
        /// </summary>
        public Task Completion { get; }

        /// <summary>
        /// Maximum number of parallel message processors.
        /// </summary>
        public int MaxDegreeOfParallelism { get; }

        /// <summary>
        /// <see cref="MessageQueue"/> that is pumped by this instance.
        /// </summary>
        public MessageQueue MessageQueue { get; }

        /// <summary>
        /// Creates a new <see cref="MessagePump"/> instance.
        /// </summary>
        private MessagePump(MessageQueue messageQueue, Func<Message, Task> processMessage, int maxDegreeOfParallelism, CancellationToken ct)
        {
            MessageQueue = messageQueue;
            MaxDegreeOfParallelism = maxDegreeOfParallelism;

            // Kick off the loop.
            Completion = RunAsync(processMessage, ct);
        }

        /// <summary>
        /// Soft-terminates the pump so that no more messages will be pumped.
        /// Any messages already removed from the message queue will be
        /// processed before this instance fully completes.
        /// </summary>
        public void Stop()
        {
            // Multiple calls to Stop are fine.
            _stop.TrySetResult(true);
        }

        /// <summary>
        /// Pump implementation.
        /// </summary>
        private async Task RunAsync(Func<Message, Task> processMessage, CancellationToken ct = default(CancellationToken))
        {
            using (CancellationTokenSource producerCTS = ct.CanBeCanceled
                ? CancellationTokenSource.CreateLinkedTokenSource(ct)
                : new CancellationTokenSource())
            {
                // This CancellationToken will either be signaled
                // externally, or if our consumer errors.
                ct = producerCTS.Token;

                // Handover between producer and consumer.
                DataflowBlockOptions bufferOptions = new DataflowBlockOptions {
                    // There is no point in dequeuing more messages than we can process,
                    // so we'll throttle the producer by limiting the buffer capacity.
                    BoundedCapacity = MaxDegreeOfParallelism,
                    CancellationToken = ct
                };

                BufferBlock<Message> buffer = new BufferBlock<Message>(bufferOptions);

                Task producer = Task.Run(async () =>
                {
                    try
                    {
                        while (_stop.Task.Status != TaskStatus.RanToCompletion)
                        {
                            // This line and next line are the *only* two cancellation
                            // points which will not cause dropped messages.
                            ct.ThrowIfCancellationRequested();

                            Task<Message> peekTask = WithCancellation(PeekAsync(MessageQueue), ct);

                            if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task)
                            {
                                // Stop was signaled before PeekAsync returned. Wind down the producer gracefully
                                // by breaking out and propagating completion to the consumer blocks.
                                break;
                            }

                            await peekTask.ConfigureAwait(false); // Observe Peek exceptions.

                            ct.ThrowIfCancellationRequested();

                            // Zero timeout means that we will error if someone else snatches the
                            // peeked message from the queue before we get to it (due to a race).
                            // I deemed this better than getting stuck waiting for a message which
                            // may never arrive, or, worse yet, let this ReceiveAsync run onobserved
                            // due to a cancellation (if we choose to abandon it like we do PeekAsync).
                            // You will have to restart the pump if this throws.
                            // Omit timeout if this behaviour is undesired.
                            Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false);

                            await buffer.SendAsync(message, ct).ConfigureAwait(false);
                        }
                    }
                    finally
                    {
                        buffer.Complete();
                    }
                },
                ct);

                // Wire up the parallel consumers.
                ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions {
                    CancellationToken = ct,
                    MaxDegreeOfParallelism = MaxDegreeOfParallelism,
                    SingleProducerConstrained = true, // We don't require thread safety guarantees.
                    BoundedCapacity = MaxDegreeOfParallelism,
                };

                ActionBlock<Message> consumer = new ActionBlock<Message>(async message =>
                {
                    ct.ThrowIfCancellationRequested();

                    await processMessage(message).ConfigureAwait(false);
                },
                executionOptions);

                buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });

                if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion)
                {
                    // If we got here, consumer probably errored. Stop the producer
                    // before we throw so we don't go dequeuing more messages.
                    producerCTS.Cancel();
                }

                // Task.WhenAll checks faulted tasks before checking any
                // canceled tasks, so if our consumer threw a legitimate
                // execption, that's what will be rethrown, not the OCE.
                await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false);
            }
        }

        /// <summary>
        /// APM -> TAP conversion for MessageQueue.Begin/EndPeek.
        /// </summary>
        private static Task<Message> PeekAsync(MessageQueue messageQueue)
        {
            return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek);
        }

        /// <summary>
        /// APM -> TAP conversion for MessageQueue.Begin/EndReceive.
        /// </summary>
        private static Task<Message> ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout)
        {
            return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek);
        }

        /// <summary>
        /// Allows abandoning tasks which do not natively
        /// support cancellation. Use with caution.
        /// </summary>
        private static async Task<T> WithCancellation<T>(Task<T> task, CancellationToken ct)
        {
            ct.ThrowIfCancellationRequested();

            TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();

            using (ct.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs, false))
            {
                if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
                {
                    // Cancellation task completed first.
                    // We are abandoning the original task.
                    throw new OperationCanceledException(ct);
                }
            }

            // Task completed: synchronously return result or propagate exceptions.
            return await task.ConfigureAwait(false);
        }
    }
}

Usage:

using (MessageQueue msMq = GetQueue())
{
    MessagePump pump = MessagePump.Run(
        msMq,
        async message =>
        {
            await Task.Delay(50);
            Console.WriteLine($"Finished processing message {message.Id}");
        },
        maxDegreeOfParallelism: 4
    );

    for (int i = 0; i < 100; i++)
    {
        msMq.Send(new Message());

        Thread.Sleep(25);
    }

    pump.Stop();

    await pump.Completion;
}

Untidy but functional unit tests:

https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da

ORIGINAL ANSWER

As mentioned in my comment, there are established producer/consumer patterns in .NET, one of which is pipeline. An excellent example of such can be found in "Patterns of Parallel Programming" by Microsoft's own Stephen Toub (full text here: https://www.microsoft.com/en-au/download/details.aspx?id=19222, page 55).

The idea is simple: producers continuously throw stuff in a queue, and consumers pull it out and process (in parallel to producers and possibly one another).

Here's an example of a message pipeline where the consumer uses synchronous, blocking methods to process the items as they arrive (I've parallelised the consumer to suit your scenario):

void MessageQueueWithBlockingCollection()
{
    // If your processing is continuous and never stops throughout the lifetime of
    // your application, you can ignore the fact that BlockingCollection is IDisposable.
    using (BlockingCollection<Message> messages = new BlockingCollection<Message>())
    {
        Task producer = Task.Run(() =>
        {
            try
            {
                for (int i = 0; i < 10; i++)
                {
                    // Hand over the message to the consumer.
                    messages.Add(new Message());

                    // Simulated arrival delay for the next message.
                    Thread.Sleep(10);
                }
            }
            finally
            {
                // Notify consumer that there is no more data.
                messages.CompleteAdding();
            }
        });

        Task consumer = Task.Run(() =>
        {
            ParallelOptions options = new ParallelOptions {
                MaxDegreeOfParallelism = 4
            };

            Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => {
                ProcessMessage(message);
            });
        });

        Task.WaitAll(producer, consumer);
    }
}

void ProcessMessage(Message message)
{
    Thread.Sleep(40);
}

The above code completes in approx 130-140 ms, which is exactly what you would expect given the parallelisation of the consumers.

Now, in your scenario you are using Tasks and async/await better suited to TPL Dataflow (official Microsoft supported library tailored to parallel and asynchronous sequence processing).

Here's a little demo showing the different types of TPL Dataflow processing blocks that you would use for the job:

async Task MessageQueueWithTPLDataflow()
{
    // Set up our queue.
    BufferBlock<Message> queue = new BufferBlock<Message>();

    // Set up our processing stage (consumer).
    ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions {
        CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation.
        MaxDegreeOfParallelism = 4
    };

    ActionBlock<Message> consumer = new ActionBlock<Message>(m => ProcessMessageAsync(m), options);

    // Link the queue to the consumer.
    queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });

    // Wire up our producer.
    Task producer = Task.Run(async () =>
    {
        try
        {
            for (int i = 0; i < 10; i++)
            {
                queue.Post(new Message());

                await Task.Delay(10).ConfigureAwait(false);
            }
        }
        finally
        {
            // Signal to the consumer that there are no more items.
            queue.Complete();
        }
    });

    await consumer.Completion.ConfigureAwait(false);
}

Task ProcessMessageAsync(Message message)
{
    return Task.Delay(40);
}

It's not hard to adapt the above to use your MessageQueue and you can be sure that the end result will be free of threading issues. I'll do just that if I get a bit more time today/tomorrow.

查看更多
Deceive 欺骗
3楼-- · 2020-02-29 12:15

You have one collection of things you want to process. You create another collection for things being processed (this could be your task objects or items of some sort that reference a task).

You create a loop that will repeat as long as you have work to do. That is, work items are waiting to be started or you still have work items being processed.

At the start of the loop you populate your active task collection with as many tasks as you want to run concurrently and you start them as you add them.

You let the things run for a while (like Thread.Sleep(10);).

You create an inner loop that checks all your started tasks for completion. If one has completed, you remove it and report the results or do whatever seems appropriate.

That's it. On the next turn the upper part of your outer loop will add tasks to your running tasks collection until the number equals the maximum you have set, keeping your work-in-progress collection full.

You may want to do all this on a worker thread and monitor cancel requests in your loop.

查看更多
Evening l夕情丶
4楼-- · 2020-02-29 12:21

The task library in .NET is made to execute a number of tasks in parallell. While there are ways to limit the number of active tasks, the library itself will limit the number of active tasks according to the computers CPU.

The first question that needs to be answered is why do you need to create another limit? If the limit imposed by the task library is OK, then you can just keep create tasks and rely on the task library to execute it with good performance.

If this is OK, then as soon as you get a message from MSMQ just start a task to process the message, skip the waiting (WhenAll call), start over and wait for the next message.

You can limit the number of concurrent tasks by using a custom task scheduler. More on MSDN: https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx.

查看更多
Animai°情兽
5楼-- · 2020-02-29 12:25

My colleague came up with the solution below. This solution works, but I'll let this code be reviewed on Code Review.

Based on answers given and some research of our own, we've come to a solution. We're using a SemaphoreSlim to limit our number of parallel Tasks.

static string queue = @".\Private$\concurrenttest";

private static async Task Process(CancellationToken token)
{
    MessageQueue msMq = new MessageQueue(queue);
    msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
    SemaphoreSlim s = new SemaphoreSlim(15, 15);

    while (true)
    {
        await s.WaitAsync();
        await PeekAsync(msMq);
        Command1 message = await ReceiveAsync(msMq);
        Task.Run(async () =>
        {
            try
            {
                await HandleMessage(message);
            }
            catch (Exception)
            {
                // Exception handling
            }
            s.Release();
        });
    }
}

private static Task HandleMessage(Command1 message)
{
    Console.WriteLine("id: " + message.id + ", name: " + message.name);
    Thread.Sleep(1000);
    return Task.FromResult(1);
}

private static Task<Message> PeekAsync(MessageQueue msMq)
{
    return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}

public class Command1
{
    public int id { get; set; }
    public string name { get; set; }
}

private static async Task<Command1> ReceiveAsync(MessageQueue msMq)
{
    var receiveAsync = await Task.Factory.FromAsync<Message>(msMq.BeginReceive(), msMq.EndPeek);
    return (Command1)receiveAsync.Body;
}
查看更多
beautiful°
6楼-- · 2020-02-29 12:30

You should look at using Microsoft's Reactive Framework for this.

You code could look like this:

var query =
    from command1 in FromQueue<Command1>(queue)
    from text in Observable.Start(() =>
    {
        Thread.Sleep(1000);
        return "id: " + command1.id + ", name: " + command1.name;
    })
    select text;

var subscription =
    query
        .Subscribe(text => Console.WriteLine(text));

This does all of the processing in parallel, and ensures that the processing is properly distributed across all cores. When one value ends another starts.

To cancel the subscription just call subscription.Dispose().

The code for FromQueue is:

static IObservable<T> FromQueue<T>(string serverQueue)
{
    return Observable.Create<T>(observer =>
    {
        var responseQueue = Environment.MachineName + "\\Private$\\" + Guid.NewGuid().ToString();
        var queue = MessageQueue.Create(responseQueue);

        var frm = new System.Messaging.BinaryMessageFormatter();
        var srv = new MessageQueue(serverQueue);
        srv.Formatter = frm;
        queue.Formatter = frm;

        srv.Send("S " + responseQueue);

        var loop = NewThreadScheduler.Default.ScheduleLongRunning(cancel =>
        {
            while (!cancel.IsDisposed)
            {
                var msg = queue.Receive();
                observer.OnNext((T)msg.Body);
            }
        });

        return new CompositeDisposable(
            loop,
            Disposable.Create(() =>
            {
                srv.Send("D " + responseQueue);
                MessageQueue.Delete(responseQueue);
            })
        );
    });
}

Just NuGet "Rx-Main" to get the bits.


In order to limit the concurrency you can do this:

int maxConcurrent = 2;
var query =
    FromQueue<Command1>(queue)
        .Select(command1 => Observable.Start(() =>
        {
            Thread.Sleep(1000);
            return "id: " + command1.id + ", name: " + command1.name;
        }))
        .Merge(maxConcurrent);
查看更多
登录 后发表回答