So my requirement is to have my function wait for the first instance an event Action<T>
coming from another class and another thread, and handle it on my thread, allowing the wait to be interrupted by either timeout or CancellationToken
.
I want to create a generic function I can reuse. I managed to create a couple options that do (I think) what I need, but both seem more complicated than I'd imagine it should have to be.
Usage
Just to be clear, a sample use of this function would look like this, where serialDevice
is spitting out events on a separate thread:
var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
cancellationToken,
statusPacket => OnStatusPacketReceived(statusPacket),
a => serialDevice.StatusPacketReceived += a,
a => serialDevice.StatusPacketReceived -= a,
5000,
() => serialDevice.RequestStatusPacket());
Option 1—ManualResetEventSlim
This option isn't bad, but the Dispose
handling of the ManualResetEventSlim
is messier than it seems like it should be. It gives ReSharper fits that I'm accessing modified/disposed things within the closure, and it's genuinely hard to follow so I'm not even sure it's correct. Maybe there's something I'm missing that can clean this up, which would be my preference, but I don't see it offhand. Here's the code.
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var eventOccurred = false;
var eventResult = default(TEvent);
var o = new object();
var slim = new ManualResetEventSlim();
Action<TEvent> setResult = result =>
{
lock (o) // ensures we get the first event only
{
if (!eventOccurred)
{
eventResult = result;
eventOccurred = true;
// ReSharper disable AccessToModifiedClosure
// ReSharper disable AccessToDisposedClosure
if (slim != null)
{
slim.Set();
}
// ReSharper restore AccessToDisposedClosure
// ReSharper restore AccessToModifiedClosure
}
}
};
subscribe(setResult);
try
{
if (initializer != null)
{
initializer();
}
slim.Wait(msTimeout, token);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(setResult);
lock(o) // ensure we don't access slim
{
slim.Dispose();
slim = null;
}
}
lock (o) // ensures our variables don't get changed in middle of things
{
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
}
Option 2—polling without a WaitHandle
The WaitForSingleEvent
function here is much cleaner. I'm able to use ConcurrentQueue
and thus don't even need a lock. But I just don't like the polling function Sleep
, and I don't see any way around it with this approach. I'd like to pass in a WaitHandle
instead of a Func<bool>
to clean up Sleep
, but the second I do that I've got the whole Dispose
mess to clean up again.
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new ConcurrentQueue<TEvent>();
subscribe(q.Enqueue);
try
{
if (initializer != null)
{
initializer();
}
token.Sleep(msTimeout, () => !q.IsEmpty);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(q.Enqueue);
}
TEvent eventResult;
var eventOccurred = q.TryDequeue(out eventResult);
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
var start = DateTime.Now;
while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
{
token.ThrowIfCancellationRequested();
Thread.Sleep(1);
}
}
The question
I don't particularly care for either of these solutions, nor am I 100% sure either of them are 100% correct. Is either one of these solutions better than the other (idiomaticity, efficiency, etc), or is there an easier way or built-in function to meet what I need to do here?
Update: Best answer so far
A modification of the TaskCompletionSource
solution below. No long closures, locks, or anything required. Seems pretty straightforward. Any errors here?
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var tcs = new TaskCompletionSource<TEvent>();
Action<TEvent> handler = result => tcs.TrySetResult(result);
var task = tcs.Task;
subscribe(handler);
try
{
if (initializer != null)
{
initializer();
}
task.Wait(msTimeout, token);
}
finally
{
unsubscribe(handler);
// Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
}
if (task.Status == TaskStatus.RanToCompletion)
{
onEvent(task.Result);
return true;
}
return false;
}
Update 2: Another great solution
Turns out that BlockingCollection
works just like ConcurrentQueue
but also has methods accepting a timeout and cancellation token. One nice thing about this solution is that it can be updated to make a WaitForNEvents
fairly easily:
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new BlockingCollection<TEvent>();
Action<TEvent> add = item => q.TryAdd(item);
subscribe(add);
try
{
if (initializer != null)
{
initializer();
}
TEvent eventResult;
if (q.TryTake(out eventResult, msTimeout, token))
{
handler(eventResult);
return true;
}
return false;
}
finally
{
unsubscribe(add);
q.Dispose();
}
}