How to read interleaved file concurrently using re

2020-06-26 23:04发布

I am new to the reactive extensions and I would like to use it (in c#) to read a file which contains several streams that are interleaved. Basically the file is in the format ABCDABCDABCD.... I would prefer to read the file sequentially and separate the streams (ie AAA.., BBB.., etc) and process each stream in parallel, using separate threads for each stream.

There will have to be some form of buffering to make sure each stream can remain busy as much as possible (within limits of course). Not all streams start at the same time necessarily, in which case a number of elements have to be skipped for the delayed streams. In this case the buffering might bridge the gap.

The elements in the file are small (4 bytes) so it is quite chatty. Therefore, I'm also looking for a way to deal with this efficiently.

I started out by creating an enumerable to read the file. This could be made to supply a struct which contains the stream ID, or the streams could be separated based on the order (element number modulo number of streams). The later is probably more efficient though.

2条回答
疯言疯语
2楼-- · 2020-06-26 23:31

Below is my solution which is based on the answer by yamen. It appears to work correctly, meaning that the sequential interleaved input is split into multiple sequential streams which are processed in parallel (multi threaded).

However, I am not sure if this is a proper implementation (in terms of programming style, rx contracts, etc).

const int MAX_BUFFERED_ELEMENTS = 1024;

// number of streams in the file
var numberOfStreams = 8;

// semaphore to limit buffered elements
var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);
var cts = new CancellationTokenSource(); // should be used to cancel (left out of this sample)

// create subjects that are the base of each output stream
var subjects = Enumerable.Repeat(0, numberOfStreams).Select(_ => new Subject<ElementType>()).ToArray();

// create the source stream (reader is IEnumerable<ElementType>)
var observable = reader.ToObservable(Scheduler.ThreadPool).Publish();

// forward elements from source to the output subjects
int stream = 0;
observable.Subscribe(x => { 
    semaphores.Wait(cts.Token);   // wait if buffer is full
    _subjects[stream].OnNext(x);  // forward to output stream
    if (++stream >= numberOfStreams) stream = 0; }); // stream = stream++ % numberOfStreams

// build output streams
subjects.Select(
    (s,i) => s.ObserveOn(Scheduler.ThreadPool) // process on separate threads
    .Do(_ => semaphore.Release())              // signal that element is consumed
    .Subscribe(x => Console.WriteLine("stream: {0}\t element: {1}", i, x)) // debug 'processing'
    );

// start processing!
observable.Connect();
查看更多
乱世女痞
3楼-- · 2020-06-26 23:45

This question has 'it depends' stamped all over it, especially when you're talking about performance and efficiency but have provided an example that is somewhat contrived. Namely, your example file is dead simple compared to the real file. However, I will attempt to provide some advice on the off chance that it is useful.

Here's a method to turn a stream into an Enumerable<char>. The stream will apply the buffering, this will send one result back at a time. This could be made more efficient (to send back chunks of data), but at some point you need to process them one at a time and it may as well be here. Don't prematurely optimise.

IEnumerable<char> ReadBytes(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return (char)reader.Read();
    }
}

Now, let's say this is the processing code for the 'output' observables. First, I set the output observables up, and then I subscribe to them as appropriate. Note that I'm using an array here so my output observable index is the array index. One could use a dictionary also, if the stream index couldn't be turned into a zero-based index.

var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));

Notice the use of Subject<char> to send my elements out on. This depends on the type of your element, but char works in the example given. Notice also that I delay the elements only to prove everything is working. They are now independent streams and you can do whatever you want with them.

OK, given a file stream:

var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);

I can now subscribe and use the modulo index to send to the right output stream:

ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

There are potentially more efficient methods here depending on exactly how you can calculate the target stream, but the idea remains the same.

Input file contains just one line: ABCABCABCABCABCABC

Output from running the program is:

he: C
he: C
he: C
he: C
he: C
he: C

One second later:

ho: B
ho: B
ho: B
ho: B
ho: B
ho: B

And then another second:

hi: A
hi: A
hi: A
hi: A
hi: A
hi: A
查看更多
登录 后发表回答