I looking for help on how to make use of yield keyword to return IEnumberable in parallel blocks or Task block. Below is the pseudo code
public IEnumerable<List<T>> ReadFile( )
{
foreach (string filepath in lstOfFiles)
{
var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
foreach (var item in ReadStream(stream))
yield return item; //where item is of type List<string>
}
}
I want to convert above code to parallel block like below
lstOfFiles.AsParallel()
.ForAll(filepath =>
{
var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
foreach (var item in ReadStream(Stream))
yield return item;
});
but compiler throws error that Yield cannot be used in Parallel blocks or anonymous delegate. I tried with Task block also, yield is not allowed in task anonymous delegate
Any one suggest me simple and best way to have yield to return collection of data in parallel blocks or task.
I read that RX 2.0 or TPL are good to use in the above scenario. I have a doubt whether to make use of RX or TPL library for asynchronous return of yield of values. Can any one suggest me which is better either Rx or TPL.
If i use of Rx, is it necessary to create subscribe and convert parallel block AsObservable.
To use Rx, you'll have to use
IObservable<T>
instead ofIEnumerable<T>
.Each time that you call
Subscribe
on the observable returned byReadFiles
, it will iterate over all of the strings inlstOfFiles
and, in parallel*, read each file stream.Sequentially, the query opens each file stream and passes it to
ReadStream
, which is responsible for generating the asynchronous sequence of items for a given stream.The
ReadFiles
query, which uses theSelectMany
operator written in query comprehension syntax, merges each "item" that is generated by allReadStream
observables into a single observable sequence, respecting the asynchrony of the source.You should strongly consider writing an async iterator for your
ReadStream
method as I've shown here; otherwise, if you must returnIEnumerable<T>
, then you'll have to convert it by applying theToObservable(scheduler)
operator with a concurrency-introducing scheduler, which may be less efficient.* Rx does not introduce any concurrency here. Parallelization is simply a result of the asynchronous nature of the underlying API, so it's very efficient. Reading from a file stream asynchronously may cause Windows to use an I/O completion port as an optimization, notifying on a pooled thread when each buffer becomes available. This ensure that Windows is entirely responsible for scheduling callbacks to your application, rather than the TPL or yourself.
Rx is free-threaded, so every notification to your observer may be on a different pooled thread; however, due to Rx's serialization contract (§4.2 Rx Design Guidelines), you will not receive overlapping notifications in your observer when you call
Subscribe
, so there's no need to provide explicit synchronization, such as locking.However, due to the parallelized nature of this query, you may observe alternating notifications with respect to each file, but never overlapping notifications.
If you'd rather receive all items for a given file at once, as you've hinted at in your question, then you can simply apply the
ToList
operator to the query and change the return type:If you need to observe notifications with thread affinity (on a GUI thread, for instance), then you must marshal the notifications because they will be arriving on a pooled thread. Since this query does not introduce concurrency itself, the best way to achieve this is to apply the
ObserveOnDispatcher
operator (WPF, Store Apps, Phone, Silverlight) or theObserveOn(SynchronizationContext)
overload (WinForms, ASP.NET, etc.). Just don't forget to add a reference to the appropriate platform-specific NuGet package; e.g., Rx-Wpf, Rx-WinForms, Rx-WindowsStore, etc.You may be tempted to convert the observable back into an
IEnumerable<T>
instead of callingSubscribe
. Do not do this. In most cases it's unnecessary, it can be inefficient and in the worst case it could potentially cause dead locks. Once you enter the world of asynchrony, you should try to stay in it. This is not just true for Rx but also forasync/await
.It looks like you want to use
SelectMany
. You can't useyield
in an anonymous method, but you can break this out into a new method, like so: