How to implement generic callbacks using the C# Ta

2019-05-27 10:10发布

I have a component that submits requests to a web-based API, but these requests must be throttled so as not to contravene the API's data limits. This means that all requests must pass through a queue to control the rate at which they are submitted, but they can (and should) execute concurrently to achieve maximum throughput. Each request must return some data to the calling code at some point in the future when it completes.

I'm struggling to create a nice model to handle the return of data.

Using a BlockingCollection I can't just return a Task<TResult> from the Schedule method, because the enqueuing and dequeuing processes are at either ends of the buffer. So instead I create a RequestItem<TResult> type that contains a callback of the form Action<Task<TResult>>.

The idea is that once an item has been pulled from the queue the callback can be invoked with the started task, but I've lost the generic type parameters by that point and I'm left using reflection and all kinds of nastiness (if it's even possible).

For example:

public class RequestScheduler 
{
    private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();

    public RequestScheduler()
    {
        this.Start();
    }

    // This can't return Task<TResult>, so returns void.
    // Instead RequestItem is generic but this poses problems when adding to the queue
    public void Schedule<TResult>(RequestItem<TResult> request)
    {
        _queue.Add(request);
    }

    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                // I want to be able to use the original type parameters here
                // is there a nice way without reflection?
                // ProcessItem submits an HttpWebRequest
                Task.Factory.StartNew(() => ProcessItem(item))
                   .ContinueWith(t => { item.Callback(t); });
            }
        });
    }

    public void Stop()
    {
        _queue.CompleteAdding();
    }
}

public class RequestItem<TResult> : IRequestItem
{
    public IOperation<TResult> Operation { get; set; }
    public Action<Task<TResult>> Callback { get; set; }
}

How can I continue to buffer my requests but return a Task<TResult> to the client when the request is pulled from the buffer and submitted to the API?

1条回答
够拽才男人
2楼-- · 2019-05-27 10:34

First, you can return Task<TResult> from Schedule(), you just need to use TaskCompletionSource for that.

Second, to get around the genericity issue, you can hide all of it inside (non-generic) Actions. In Schedule(), create an action using a lambda that does exactly what you need. The consuming loop will then execute that action, it doesn't need to know what's inside.

Third, I don't understand why are you starting a new Task in each iteration of the loop. For one, it means you won't actually get any throttling.

With these modifications, the code could look like this:

public class RequestScheduler
{
    private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>();

    public RequestScheduler()
    {
        this.Start();
    }

    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var action in m_queue.GetConsumingEnumerable())
            {
                action();
            }
        }, TaskCreationOptions.LongRunning);
    }

    public Task<TResult> Schedule<TResult>(IOperation<TResult> operation)
    {
        var tcs = new TaskCompletionSource<TResult>();

        Action action = () =>
        {
            try
            {
                tcs.SetResult(ProcessItem(operation));
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
        };

        m_queue.Add(action);

        return tcs.Task;
    }

    private T ProcessItem<T>(IOperation<T> operation)
    {
        // whatever
    }
}
查看更多
登录 后发表回答