Non blocking and reoccurring producer/consumer not

2019-05-27 11:23发布

问题:

Searched hard for a piece of code which does what i want and i am happy with. Reading this and this helped a lot.

I have a scenario where i need a single consumer to be notified by a single producer when new data is available but would also like the consumer to be notified periodically regardless of if new data is available. It is fine if the consumer is notified more than the reoccurring period but it should not be notified less frequent.

It is possible that multiple notifications for 'new data' occur while the consumer is already notified and working. (So SemaphoreSlim was not a good fit). Hence, a consumer which is slower than the rate of producer notifications, would not queue up subsequent notifications, they would just "re-signal" that same "data available" flag without affect.

I would also like the consumer to asynchronously wait for the notifications (without blocking a thread).

I have stitched together the below class which wraps around TaskCompletionSource and also uses an internal Timer.

public class PeriodicalNotifier : IDisposable
{
    // Need some dummy type since TaskCompletionSource has only the generic version
    internal struct VoidTypeStruct { }
    // Always reuse this allocation
    private static VoidTypeStruct dummyStruct;

    private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
    private Timer reSendTimer;

    public PeriodicalNotifier(int autoNotifyIntervalMs)
    {
        internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
        {
            await internalCompletionSource.Task;
            // Recreate - to be able to set again upon the next wait
            internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        }
    }

    public void Notify()
    {
        internalCompletionSource.TrySetResult(dummyStruct);
    }

    public void Dispose()
    {
        reSendTimer.Dispose();
        internalCompletionSource.TrySetCanceled();
    }
}

Users of this class can do something like this:

private PeriodicalNotifier notifier = new PeriodicalNotifier(100);

// ... In some task - which should be non-blocking
while (some condition)
{
    await notifier.WaitForNotifictionAsync(_tokenSource.Token);
    // Do some work...
}

// ... In some thread, producer added new data
notifier.Notify();

Efficiency is important to me, the scenario is of a high frequency data stream, and so i had in mind:

  • The non-blocking nature of the wait.
  • I assume Timer is more efficient than recreating Task.Delay and cancelling it if it's not the one to notify.
  • A concern for the recreation of the TaskCompletionSource

My questions are:

  1. Does my code correctly solve the problem? Any hidden pitfalls?
  2. Am i missing some trivial solution / existing block for this use case?

Update:

I have reached a conclusion that aside from re implementing a more lean Task Completion structure (like in here and here) i have no more optimizations to make. Hope that helps anyone looking at a similar scenario.

回答1:

  1. Yes, your implementation makes sense but the TaskCompletionSource recreation should be outside the using scope, otherwise the "old" cancellation token may cancel the "new" TaskCompletionSource.
  2. I think using some kind of AsyncManualResetEvent combined with a Timer would be simpler and less error-prone. There's a very nice namespace with async tools in the Visual Studio SDK by Microsoft. You need to install the SDK and then reference the Microsoft.VisualStudio.Threading assembly. Here's an implementation using their AsyncManualResetEvent with the same API:

public class PeriodicalNotifier : IDisposable
{
    private readonly Timer _timer;
    private readonly AsyncManualResetEvent _asyncManualResetEvent;

    public PeriodicalNotifier(TimeSpan autoNotifyInterval)
    {
        _asyncManualResetEvent = new AsyncManualResetEvent();
        _timer = new Timer(_ => Notify(), null, TimeSpan.Zero, autoNotifyInterval);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        await _asyncManualResetEvent.WaitAsync().WithCancellation(cancellationToken);
        _asyncManualResetEvent.Reset();
    }

    public void Notify()
    {
        _asyncManualResetEvent.Set();
    }

    public void Dispose()
    {
        _timer.Dispose();
    }
}

You notify by setting the reset event, asynchronously wait using WaitAsync, enable Cancellation using the WithCancellation extension method and then reset the event. Multiple notifications are "merged" by setting the same reset event.



回答2:

Subject<Result> notifier = new Subject<Result)();

notifier 
    .Select(value => Observable.Interval(TimeSpan.FromMilliSeconds(100))
                                            .Select(_ => value)).Switch()
    .Subscribe(value => DoSomething(value));

//Some other thread...
notifier.OnNext(...);

This Rx query will keep sending value, every 100 milliseconds, until a new value turns up. Then we notify that value every 100 milliseconds.

If we receive values faster than once every 100 milliseconds, then we basically have the same output as input.