I have a component that submits requests to a web-based API, but these requests must be throttled so as not to contravene the API's data limits. This means that all requests must pass through a queue to control the rate at which they are submitted, but they can (and should) execute concurrently to achieve maximum throughput. Each request must return some data to the calling code at some point in the future when it completes.
I'm struggling to create a nice model to handle the return of data.
Using a BlockingCollection
I can't just return a Task<TResult>
from the Schedule
method, because the enqueuing and dequeuing processes are at either ends of the buffer. So instead I create a RequestItem<TResult>
type that contains a callback of the form Action<Task<TResult>>
.
The idea is that once an item has been pulled from the queue the callback can be invoked with the started task, but I've lost the generic type parameters by that point and I'm left using reflection and all kinds of nastiness (if it's even possible).
For example:
public class RequestScheduler
{
private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();
public RequestScheduler()
{
this.Start();
}
// This can't return Task<TResult>, so returns void.
// Instead RequestItem is generic but this poses problems when adding to the queue
public void Schedule<TResult>(RequestItem<TResult> request)
{
_queue.Add(request);
}
private void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
// I want to be able to use the original type parameters here
// is there a nice way without reflection?
// ProcessItem submits an HttpWebRequest
Task.Factory.StartNew(() => ProcessItem(item))
.ContinueWith(t => { item.Callback(t); });
}
});
}
public void Stop()
{
_queue.CompleteAdding();
}
}
public class RequestItem<TResult> : IRequestItem
{
public IOperation<TResult> Operation { get; set; }
public Action<Task<TResult>> Callback { get; set; }
}
How can I continue to buffer my requests but return a Task<TResult>
to the client when the request is pulled from the buffer and submitted to the API?
First, you can return
Task<TResult>
fromSchedule()
, you just need to useTaskCompletionSource
for that.Second, to get around the genericity issue, you can hide all of it inside (non-generic)
Action
s. InSchedule()
, create an action using a lambda that does exactly what you need. The consuming loop will then execute that action, it doesn't need to know what's inside.Third, I don't understand why are you starting a new
Task
in each iteration of the loop. For one, it means you won't actually get any throttling.With these modifications, the code could look like this: