I am new to the reactive extensions and I would like to use it (in c#) to read a file which contains several streams that are interleaved. Basically the file is in the format ABCDABCDABCD...
. I would prefer to read the file sequentially and separate the streams (ie AAA..
, BBB..
, etc) and process each stream in parallel, using separate threads for each stream.
There will have to be some form of buffering to make sure each stream can remain busy as much as possible (within limits of course). Not all streams start at the same time necessarily, in which case a number of elements have to be skipped for the delayed streams. In this case the buffering might bridge the gap.
The elements in the file are small (4 bytes) so it is quite chatty. Therefore, I'm also looking for a way to deal with this efficiently.
I started out by creating an enumerable to read the file. This could be made to supply a struct which contains the stream ID, or the streams could be separated based on the order (element number modulo number of streams). The later is probably more efficient though.
Below is my solution which is based on the answer by yamen. It appears to work correctly, meaning that the sequential interleaved input is split into multiple sequential streams which are processed in parallel (multi threaded).
However, I am not sure if this is a proper implementation (in terms of programming style, rx contracts, etc).
This question has 'it depends' stamped all over it, especially when you're talking about performance and efficiency but have provided an example that is somewhat contrived. Namely, your example file is dead simple compared to the real file. However, I will attempt to provide some advice on the off chance that it is useful.
Here's a method to turn a stream into an
Enumerable<char>
. The stream will apply the buffering, this will send one result back at a time. This could be made more efficient (to send back chunks of data), but at some point you need to process them one at a time and it may as well be here. Don't prematurely optimise.Now, let's say this is the processing code for the 'output' observables. First, I set the output observables up, and then I subscribe to them as appropriate. Note that I'm using an array here so my output observable index is the array index. One could use a dictionary also, if the stream index couldn't be turned into a zero-based index.
Notice the use of
Subject<char>
to send my elements out on. This depends on the type of your element, butchar
works in the example given. Notice also that I delay the elements only to prove everything is working. They are now independent streams and you can do whatever you want with them.OK, given a file stream:
I can now subscribe and use the modulo index to send to the right output stream:
There are potentially more efficient methods here depending on exactly how you can calculate the target stream, but the idea remains the same.
Input file contains just one line:
ABCABCABCABCABCABC
Output from running the program is:
One second later:
And then another second: