So my requirement is to have my function wait for the first instance an event Action<T>
coming from another class and another thread, and handle it on my thread, allowing the wait to be interrupted by either timeout or CancellationToken
.
I want to create a generic function I can reuse. I managed to create a couple options that do (I think) what I need, but both seem more complicated than I'd imagine it should have to be.
Usage
Just to be clear, a sample use of this function would look like this, where serialDevice
is spitting out events on a separate thread:
var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
cancellationToken,
statusPacket => OnStatusPacketReceived(statusPacket),
a => serialDevice.StatusPacketReceived += a,
a => serialDevice.StatusPacketReceived -= a,
5000,
() => serialDevice.RequestStatusPacket());
Option 1—ManualResetEventSlim
This option isn't bad, but the Dispose
handling of the ManualResetEventSlim
is messier than it seems like it should be. It gives ReSharper fits that I'm accessing modified/disposed things within the closure, and it's genuinely hard to follow so I'm not even sure it's correct. Maybe there's something I'm missing that can clean this up, which would be my preference, but I don't see it offhand. Here's the code.
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var eventOccurred = false;
var eventResult = default(TEvent);
var o = new object();
var slim = new ManualResetEventSlim();
Action<TEvent> setResult = result =>
{
lock (o) // ensures we get the first event only
{
if (!eventOccurred)
{
eventResult = result;
eventOccurred = true;
// ReSharper disable AccessToModifiedClosure
// ReSharper disable AccessToDisposedClosure
if (slim != null)
{
slim.Set();
}
// ReSharper restore AccessToDisposedClosure
// ReSharper restore AccessToModifiedClosure
}
}
};
subscribe(setResult);
try
{
if (initializer != null)
{
initializer();
}
slim.Wait(msTimeout, token);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(setResult);
lock(o) // ensure we don't access slim
{
slim.Dispose();
slim = null;
}
}
lock (o) // ensures our variables don't get changed in middle of things
{
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
}
Option 2—polling without a WaitHandle
The WaitForSingleEvent
function here is much cleaner. I'm able to use ConcurrentQueue
and thus don't even need a lock. But I just don't like the polling function Sleep
, and I don't see any way around it with this approach. I'd like to pass in a WaitHandle
instead of a Func<bool>
to clean up Sleep
, but the second I do that I've got the whole Dispose
mess to clean up again.
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new ConcurrentQueue<TEvent>();
subscribe(q.Enqueue);
try
{
if (initializer != null)
{
initializer();
}
token.Sleep(msTimeout, () => !q.IsEmpty);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(q.Enqueue);
}
TEvent eventResult;
var eventOccurred = q.TryDequeue(out eventResult);
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
var start = DateTime.Now;
while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
{
token.ThrowIfCancellationRequested();
Thread.Sleep(1);
}
}
The question
I don't particularly care for either of these solutions, nor am I 100% sure either of them are 100% correct. Is either one of these solutions better than the other (idiomaticity, efficiency, etc), or is there an easier way or built-in function to meet what I need to do here?
Update: Best answer so far
A modification of the TaskCompletionSource
solution below. No long closures, locks, or anything required. Seems pretty straightforward. Any errors here?
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var tcs = new TaskCompletionSource<TEvent>();
Action<TEvent> handler = result => tcs.TrySetResult(result);
var task = tcs.Task;
subscribe(handler);
try
{
if (initializer != null)
{
initializer();
}
task.Wait(msTimeout, token);
}
finally
{
unsubscribe(handler);
// Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
}
if (task.Status == TaskStatus.RanToCompletion)
{
onEvent(task.Result);
return true;
}
return false;
}
Update 2: Another great solution
Turns out that BlockingCollection
works just like ConcurrentQueue
but also has methods accepting a timeout and cancellation token. One nice thing about this solution is that it can be updated to make a WaitForNEvents
fairly easily:
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new BlockingCollection<TEvent>();
Action<TEvent> add = item => q.TryAdd(item);
subscribe(add);
try
{
if (initializer != null)
{
initializer();
}
TEvent eventResult;
if (q.TryTake(out eventResult, msTimeout, token))
{
handler(eventResult);
return true;
}
return false;
}
finally
{
unsubscribe(add);
q.Dispose();
}
}
You can use
TaskCompletetionSource
to create aTask
that you can mark as completed or cancelled. Here's a possible implementation for a specific event:In C# 5 you can use it like this:
If you want to wait for the event synchronously, you can also use the
Wait
method:Here's a more generic version, but it still works only for events with
Action
signature:You can use it like this:
If you want it to work with other event signatures (e.g.
EventHandler
), you will have to create separate overloads. I don't think there's an easy way to make it work for any signature, especially since the number of parameters isn't always the same.You can use Rx to convert the event to an observable, then to a task, and finally wait on that task with your token/timeout.
One advantage this has over any of the existing solutions, is that it calls
unsubscribe
on the event's thread, ensuring that your handler won't be called twice. (In your first solution you work around this bytcs.TrySetResult
instead oftcs.SetResult
, but it's always nice to get rid of a "TryDoSomething" and simply ensure DoSomething always works).Another advantage is the code's simplicity. It's essentially one line. So you don't even particularly need an independent function. You can inline it so that it's more clear what exactly your code does, and you can make variations on the theme without needing a ton of optional parameters (like your optional
initializer
, or allow waiting on N events, or foregoing timeouts/cancellation in instances where they're not necessary). And you'd have both thebool
return val and the actualresult
in scope when it's finished, if that's useful at all.