For scalability and to conserve resources, it is best to avoid reading an entire input stream into memory, but instead try to process it as a stream, reading small chunks at a time. This is easy to accomplish in .NET when you have one thing you want to do with the data, like read it from a web request and save it to a file. Easy example:
input.CopyTo(output); // reads chunks of 4096 bytes and writes them to `output`
But when I want to do multiple things with that data, it's quite a bit trickier. For example, I want to:
- Count the length of a stream that doesn't support the
Length
property. We can do this with a customDummyStream
that does nothing with the data written to it except keep track of the length. - Calculate the MD5 of the stream.
- Figure out the Mime type of the data. FindMimeFromData needs only the first 256 bytes of the stream.
- Save the whole stream to the database.
... but only do so with a single pass over the input stream, with minimal use of buffering.
I am sure that this is possible. I could probably coordinate multiple threads, with one thread doing the actual reading from the input stream and other threads for each of the "processing" tasks I want to perform. But that could easily get rather complicated and fragile if not done just right.
My question is this:
- Do I have to use multiple threads? I would love some kind of co-routine solution where all of the processing is interleaved on one thread.
- Are there ways that I could leverage C#'s
async
features, or Reactive Extensions to make this solution simpler?
I'm having trouple wrapping my brain around this one. I'm looking for guidance on the best (clean, maintainable, efficient use of computer resources) way to accomplish this, especially in light of newer technologies like the TPL, async
, and RX.
This is an example of the syntax I'm envisioning:
public static void Multicast(this Stream input, params Action<Stream>[] processingActions)
{
// TODO: ??? complicated stream multicasting logic goes here. ???
throw new NotImplementedException();
}
And you would use it like this:
long length;
byte[] md5;
string mimeType;
int uploadId;
input.Multicast(
s => length = GetLength(s),
s => md5 = CalculateMd5(s),
s => mimeType = DetermineMimeType(s, filename, mimeTypeAsReportedByClient)
s => uploadId = SaveToDatabase(s)
);
And here's an example of one of the processing actions:
private static byte[] CalculateMd5(Stream input)
{
return MD5.Create().ComputeHash(input);
}
I decided to take a shot at an Rx implementation. Here's what I got so far. It doesn't do any DB writes, but it does compute the length, MD5 hash, and mimetype with only one pass over the file and minimal buffering.
Using Rx you could use
Observable.Create
to create an observable which reads the stream, then usePublish
to allow multiple subscriptions to the steam without starting it yet, and then callConnect
on thePublished
stream to get everything off and running. You could useObserveOn
andSubscribeOn
for each of the different "routes" the stream data takes to determine when, where, and how each portion of the code runs, which means you could buffer the whole stream and submit it to the database all at once, do the same for the MD5, count the stream usingScan
orAggregate
, but you could also have an "route" that determines the mime-type and unsubscribes early. Plus, if you needed to synchronize these elements back together, you could useCombineLatest
.This question is very interesting to me, and I wish I had the time right now to post some real code examples. Unfortunately, I do not. Hopefully this gives you an idea of what operators could be used in what configurations to accomplish what you're looking for.
Here's some psuedo-code for the non-stream reading parts...
I think what you want to do is to have one input stream, multiple output streams and then copy the input to all the outputs, something like:
Those output streams don't have to be normal streams, they could be special streams that for example just calculate the length. They would need to override only the
Write()
method, so a custom baseStream
could be useful:This stream can be used directly as the counting stream, or to easily implement the find MIME stream.