Searched hard for a piece of code which does what i want and i am happy with. Reading this and this helped a lot.
I have a scenario where i need a single consumer to be notified by a single producer when new data is available but would also like the consumer to be notified periodically regardless of if new data is available. It is fine if the consumer is notified more than the reoccurring period but it should not be notified less frequent.
It is possible that multiple notifications for 'new data' occur while the consumer is already notified and working. (So SemaphoreSlim
was not a good fit).
Hence, a consumer which is slower than the rate of producer notifications, would not queue up subsequent notifications, they would just "re-signal" that same "data available" flag without affect.
I would also like the consumer to asynchronously wait for the notifications (without blocking a thread).
I have stitched together the below class which wraps around TaskCompletionSource
and also uses an internal Timer.
public class PeriodicalNotifier : IDisposable
{
// Need some dummy type since TaskCompletionSource has only the generic version
internal struct VoidTypeStruct { }
// Always reuse this allocation
private static VoidTypeStruct dummyStruct;
private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
private Timer reSendTimer;
public PeriodicalNotifier(int autoNotifyIntervalMs)
{
internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
}
public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
{
using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
{
await internalCompletionSource.Task;
// Recreate - to be able to set again upon the next wait
internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
}
}
public void Notify()
{
internalCompletionSource.TrySetResult(dummyStruct);
}
public void Dispose()
{
reSendTimer.Dispose();
internalCompletionSource.TrySetCanceled();
}
}
Users of this class can do something like this:
private PeriodicalNotifier notifier = new PeriodicalNotifier(100);
// ... In some task - which should be non-blocking
while (some condition)
{
await notifier.WaitForNotifictionAsync(_tokenSource.Token);
// Do some work...
}
// ... In some thread, producer added new data
notifier.Notify();
Efficiency is important to me, the scenario is of a high frequency data stream, and so i had in mind:
- The non-blocking nature of the wait.
- I assume Timer is more efficient than recreating Task.Delay and cancelling it if it's not the one to notify.
- A concern for the recreation of the
TaskCompletionSource
My questions are:
- Does my code correctly solve the problem? Any hidden pitfalls?
- Am i missing some trivial solution / existing block for this use case?
Update:
I have reached a conclusion that aside from re implementing a more lean Task Completion structure (like in here and here) i have no more optimizations to make. Hope that helps anyone looking at a similar scenario.