Let's say I have two sequences returning integers 1 to 5.
The first returns 1, 2 and 3 very fast, but 4 and 5 take 200ms each.
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
The second returns 1, 2 and 3 with a 200ms delay, but 4 and 5 are returned fast.
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
Unioning both these sequences give me just numbers 1 to 5.
FastFirst().Union(SlowFirst());
I cannot guarantee which of the two methods has delays at what point, so the order of the execution cannot guarantee a solution for me. Therefore, I would like to parallelise the union, in order to minimise the (artifical) delay in my example.
A real-world scenario: I have a cache that returns some entities, and a datasource that returns all entities. I'd like to be able to return an iterator from a method that internally parallelises the request to both the cache and the datasource so that the cached results yield as fast as possible.
Note 1: I realise this is still wasting CPU cycles; I'm not asking how can I prevent the sequences from iterating over their slow elements, just how I can union them as fast as possible.
Update 1: I've tailored achitaka-san's great response to accept multiple producers, and to use ContinueWhenAll to set the BlockingCollection's CompleteAdding just the once. I just put it here since it would get lost in the lack of comments formatting. Any further feedback would be great!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}
Take a look at this. The first method just returns everything in order results come. The second checks uniqueness. If you chain them you will get the result you want I think.
The cache would be nearly instant compared to fetching from the database, so you could read from the cache first and return those items, then read from the database and return the items except those that were found in the cache.
If you try to parallelise this, you will add a lot of complexity but get quite a small gain.
Edit:
If there is no predictable difference in the speed of the sources, you could run them in threads and use a synchronised hash set to keep track of which items you have already got, put the new items in a queue, and let the main thread read from the queue:
Test stream:
Test: