I've created a class whose purpose is to abstract away the control of concurrent access to a queue.
The class is designed to be instantiated on a single thread, written to by multiple threads and then read from a subsequent single thread.
I have a single long running task generated inside the class which will perform a blocking loop and fire an event if an item is successfully dequeued.
My question is this: Is my implementation of the cancelling of the long running task AND subsequent clean up/reset correct usage of the CancellationTokenSource
object?
Ideally, I'd like an active object to be able to be stopped and restarted while maintaining availability to add to the queue.
I've used Peter Bromberg's article as a basis: Producer/Consumer Queue and BlockingCollection in C# 4.0
Code Below:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
public delegate void DeliverNextQueuedItemHandler<T>(T item);
public sealed class SOQueueManagerT<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning { get; private set; }
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public CancellationTokenSource CancellationTokenSource
{
get
{
if (_canceller == null)
_canceller = new CancellationTokenSource();
return _canceller;
}
}
public SOQueueManagerT()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
IsRunning = false;
}
public void Start()
{
if (_listener == null)
{
IsRunning = true;
_listener = Task.Factory.StartNew(() =>
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
OnNextItem(item);
}
}
}
},
CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
CancellationTokenSource.Cancel();
CleanUp();
}
}
public void Add(T item)
{
_queue.Add(item);
}
private void CleanUp()
{
_listener.Wait(2000);
if (_listener.IsCompleted)
{
IsRunning = false;
_listener = null;
_canceller = null;
}
}
}
}
UPDATE Here's what I have gone with in the end. It isn't perfect but so far is doing the job.
public sealed class TaskQueueManager<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning
{
get
{
if (_listener == null)
return false;
else if (_listener.Status == TaskStatus.Running ||
_listener.Status == TaskStatus.Created ||
_listener.Status == TaskStatus.WaitingForActivation ||
_listener.Status == TaskStatus.WaitingToRun ||
_listener.IsCanceled)
return true;
else
return false;
}
}
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public TaskQueueManager()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
}
public void Start()
{
if (_listener == null)
{
_canceller = new CancellationTokenSource();
_listener = Task.Factory.StartNew(() =>
{
while (!_canceller.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
try
{
OnNextItem(item);
}
catch (Exception e)
{
//log or call an event
}
}
}
}
},
_canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
_canceller.Cancel();
if (_listener.IsCanceled && !_listener.IsCompleted)
_listener.Wait();
_listener = null;
_canceller = null;
}
}
public void Add(T item)
{
if (item != null)
{
_queue.Add(item);
}
else
{
throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
}
}
}
Careful programming is the only thing that's gonna cut it. Even if you cancel the operation you might have a pending operation that's not completing in a fashionable amount of time. It could very well be a blocking operation that's deadlocked. In this case your program will not actually terminate.
For instance, if I call your CleanUp method several times or without calling Start first I'm getting the feeling it's going to crash.
A 2 seconds timeout during cleanup, feels more arbitrary than planned, and I'd actually go as far as to ensure that things shutdown properly or crash/hang (you never want to leave concurrent stuff in an unknown state).
Also, the
IsRunning
is explicitly set, not inferred from the state of the object.For inspiration I'd like you to look at a similar class I wrote recently, it's a producer/consumer pattern that does it's work in a background thread. You can find that source code on CodePlex. Though, this was engineered to solve a very specific problem.
Here, cancellation is solved by enquing a specific type that only the worker thread recognizes and thus begins shutting down. This also ensures that I never cancel pending work, only whole units of work are considered.
To improve this situation a bit you can have a separate timer for current work and abort or rollback incomplete work if it's canceled. Now, implementing a transaction like behavior is going to take some trial and error because you need to look at every possible corner case and ask yourself, what happens if the program crashes here? Ideally all these code paths so lead to a recoverable or known state from which you can resume your work. But as I think you've guessed already, that's going to take careful programming and a lot of testing.