I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture:
The way it is currently working: Pipeline is fetching items from the provider as quickly as it can. As soon as an item is fetched, it is passed to the processors. Once an item is processed, the output is notified. While an individual item is processed in a sequential manner, multiple items may be processed in parallel (depending on how fast they are fetched from the provider).
The IObservable
created and returned from the pipeline looks like this:
return Observable.Create<T>(async observer =>
{
while (_provider.HasNext)
{
T item = await _provider.GetNextAsync();
observer.OnNext(item);
}
}).SelectMany(item => Observable.FromAsync(() =>
_processors.Aggregate(
seed: Task.FromResult(item),
func: (current, processor) => current.ContinueWith( // Append continuations.
previous => processor.ProcessAsync(previous.Result))
.Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};
The missing part: I need a control mechanism which controls how many items (max) can be in the pipeline at any given time.
For example, if max parallel processings is 3, then that would result in the following workflow:
- Item 1 is fetched and passed to the processors.
- Item 2 is fetched and passed to the processors.
- Item 3 is fetched and passed to the processors.
- Item 1 completed processing.
- Item 4 is fetched and passed to the processors.
- Item 3 completed processing.
- Item 5 is fetched and passed to the processors.
- Etc...
You might need to rearrange the code you posted but this would be one way to do it:
I've modelled your pipeline as 1 Observable (which in reality would be composed of several smaller observables chained together)
Key thing is to make sure that the semaphore gets released no matter how the pipeline terminates (Empty/Error) otherwise the stream might hang, and so a Finally() is used call Release() on the semaphore. (Might be worth considering adding a Timeout as well on the pipeline observable if it is liable to never OnComplete()/OnError().
Edit:
As per below comments, I've added some scheduling around the semaphore access so that we don't block whoever is pushing these inputs into our stream. I've used an EventLoopScheduler so that all requests for semaphore access will queue up and be executed on 1 thread.
Edit: I do prefer Paul's answer though - simple, less scheduling, less synchronisation (merge uses a queue internally).
Merge
provides an overload which takes a max concurrency.Its signature looks like:
IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);
Here is what it would look like with your example (I refactored some of the other code as well, which you can take or leave):
To break down what this does,
While
will check each iteration, if_provider.HasNext
is true, if so then it will resubscribe to get the next value for_provider
, otherwise it emitsonCompleted
Defer
IObservable<IObservable<T>>
is passed toMerge
which subscribes to a max of 3 observables simultaneously.Alternative 1
If you also need to control the number of parallel requests you need to get a little trickier, since you will need to signal that your
Observable
is ready for new values: