Stream Multicasting - read a stream once but proce

2019-03-22 00:04发布

问题:

For scalability and to conserve resources, it is best to avoid reading an entire input stream into memory, but instead try to process it as a stream, reading small chunks at a time. This is easy to accomplish in .NET when you have one thing you want to do with the data, like read it from a web request and save it to a file. Easy example:

input.CopyTo(output); // reads chunks of 4096 bytes and writes them to `output`

But when I want to do multiple things with that data, it's quite a bit trickier. For example, I want to:

  • Count the length of a stream that doesn't support the Length property. We can do this with a custom DummyStream that does nothing with the data written to it except keep track of the length.
  • Calculate the MD5 of the stream.
  • Figure out the Mime type of the data. FindMimeFromData needs only the first 256 bytes of the stream.
  • Save the whole stream to the database.

... but only do so with a single pass over the input stream, with minimal use of buffering.

I am sure that this is possible. I could probably coordinate multiple threads, with one thread doing the actual reading from the input stream and other threads for each of the "processing" tasks I want to perform. But that could easily get rather complicated and fragile if not done just right.

My question is this:

  • Do I have to use multiple threads? I would love some kind of co-routine solution where all of the processing is interleaved on one thread.
  • Are there ways that I could leverage C#'s async features, or Reactive Extensions to make this solution simpler?

I'm having trouple wrapping my brain around this one. I'm looking for guidance on the best (clean, maintainable, efficient use of computer resources) way to accomplish this, especially in light of newer technologies like the TPL, async, and RX.


This is an example of the syntax I'm envisioning:

public static void Multicast(this Stream input, params Action<Stream>[] processingActions)
{
    // TODO: ??? complicated stream multicasting logic goes here. ???
    throw new NotImplementedException();
}

And you would use it like this:

long length;
byte[] md5;
string mimeType;
int uploadId;

input.Multicast(
    s => length = GetLength(s),
    s => md5 = CalculateMd5(s),
    s => mimeType = DetermineMimeType(s, filename, mimeTypeAsReportedByClient)
    s => uploadId = SaveToDatabase(s)
);

And here's an example of one of the processing actions:

private static byte[] CalculateMd5(Stream input)
{
    return MD5.Create().ComputeHash(input);
}

回答1:

I decided to take a shot at an Rx implementation. Here's what I got so far. It doesn't do any DB writes, but it does compute the length, MD5 hash, and mimetype with only one pass over the file and minimal buffering.

using System;
using System.Diagnostics;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Security.Cryptography;

namespace RxTest
{
    internal static class Program
    {
        private static void Main()
        {
            var expectedValues = ReadExpectedValuesDirectly("demo.txt");

            new FileInfo("demo.txt")
                .ReadAsObserveable(4096)
                .ToFileData()
                .Subscribe(observed => Compare(expectedValues, observed));
        }

        private static void Compare(FileData expected, FileData observed)
        {
            Console.WriteLine();
            WriteLine("expected", expected);
            WriteLine("observed", observed);
            Console.WriteLine();

            Debug.Assert(observed.Length == expected.Length);
            Debug.Assert(BitConverter.ToString(observed.Hash) == BitConverter.ToString(expected.Hash));
            Debug.Assert(observed.MimeType == expected.MimeType);
        }

        private static void WriteLine(string prefix, FileData observed)
        {
            Console.WriteLine("{0}: {1:N0}   {2}   {3}",
                prefix,
                observed.Length,
                observed.MimeType,
                BitConverter.ToString(observed.Hash).Replace("-", ""));
        }

        private static FileData ReadExpectedValuesDirectly(string fileName)
        {
            return new FileData
            {
                Length = new FileInfo(fileName).Length,
                Hash = MD5.Create().ComputeHash(File.ReadAllBytes(fileName)),
                MimeType = FileDataExtensions.FindMimeType(GetFirst256Bytes(fileName))
            };
        }

        private static byte[] GetFirst256Bytes(string path)
        {
            using (var stream = File.OpenRead(path))
            {
                var buffer = new byte[256];

                if (stream.Length >= 256)
                    stream.Read(buffer, 0, 256);
                else
                    stream.Read(buffer, 0, (int) stream.Length);

                return buffer;
            }
        }
    }

    public class FileData
    {
        public long Length { get; set; }
        public string MimeType { get; set; }
        public byte[] Hash { get; set; }
    }

    public static class FileDataExtensions
    {
        public static IObservable<byte[]> ReadAsObserveable(this FileInfo file, int bufferSize)
        {
            return Observable.Create<byte[]>(observer =>
            {
                using (var stream = file.OpenRead())
                {
                    return stream.ReadAsObservable(bufferSize).Subscribe(observer);
                }
            });
        }

        public static IObservable<byte[]> ReadAsObservable(this Stream stream, int bufferSize)
        {
            // TODO: Add scheduling/canceling
            return Observable.Create<byte[]>(observer =>
            {
                var block = new byte[bufferSize];
                int bytesRead;

                while ((bytesRead = stream.Read(block, 0, bufferSize)) > 0)
                {
                    if (bytesRead == bufferSize)
                        observer.OnNext(block);
                    else
                    {
                        var lastBlock = new byte[bytesRead];
                        Array.Copy(block, lastBlock, bytesRead);
                        observer.OnNext(lastBlock);
                        observer.OnCompleted();
                    }
                }

                return Disposable.Empty;
            });
        }

        public static IObservable<FileData> ToFileData(this IObservable<byte[]> file)
        {
            return Observable.Create<FileData>(observer =>
            {
                var counter = 0;
                var connectable = file
                    .Do(_ => Console.WriteLine())
                    .Do(_ => Console.Write(++counter))
                    .Publish();

                var combineSub = Observable.CombineLatest(
                    connectable.TotalLength(),
                    connectable.ComputeHash(MD5.Create()),
                    connectable.FindMimeType(),
                    (length, hash, mimeType) => new FileData
                    {
                        Hash = hash,
                        Length = length,
                        MimeType = mimeType
                    })
                    .Subscribe(observer);

                var connectSub = connectable.Connect();

                return new CompositeDisposable(combineSub, connectSub);
            });
        }

        public static IObservable<long> TotalLength(this IObservable<byte[]> file)
        {
            return file
                .Do(block => Console.Write("\tLength()"))
                .Select(block => block.LongLength)
                .Sum();
        }

        public static IObservable<byte[]> ComputeHash(this IObservable<byte[]> file, HashAlgorithm algorithm)
        {
            return Observable.Create<byte[]>(observer =>
                file
                    .Do(block => Console.Write("\tComputeHash()"))
                    .Subscribe(
                        block => algorithm.TransformBlock(block, 0, block.Length, null, 0),
                        () =>
                        {
                            algorithm.TransformFinalBlock(new byte[0], 0, 0);
                            observer.OnNext(algorithm.Hash);
                            observer.OnCompleted();
                        }));
        }

        public static IObservable<string> FindMimeType(this IObservable<byte[]> file)
        {
            // this doesn't handle cases where the file is less than 256 bytes in length.
            return file
                .Do(block => Console.Write("\tFindMimeType()"))
                .Take(1)
                .Select(block =>
                {
                    var first256 = new byte[256];
                    Array.Copy(block, first256, 256);
                    return FindMimeType(first256);
                });
        }

        public static string FindMimeType(byte[] first256)
        {
            try
            {
                UInt32 unMimeType;
                FindMimeFromData(0, null, first256, 256, null, 0, out unMimeType, 0);
                var pMimeType = new IntPtr(unMimeType);
                var sMimeTypeFromFile = Marshal.PtrToStringUni(pMimeType);
                Marshal.FreeCoTaskMem(pMimeType);
                return sMimeTypeFromFile;
            }
            catch (Exception ex)
            {
                // not exactly robust exeption handling
                Console.WriteLine(ex.ToString());
                return null;
            }
        }

        [DllImport(@"urlmon.dll", CharSet = CharSet.Auto)]
        private static extern UInt32 FindMimeFromData(
            UInt32 pBC,
            [MarshalAs(UnmanagedType.LPStr)] String pwzUrl,
            [MarshalAs(UnmanagedType.LPArray)] byte[] pBuffer,
            UInt32 cbSize,
            [MarshalAs(UnmanagedType.LPStr)] String pwzMimeProposed,
            UInt32 dwMimeFlags,
            out UInt32 ppwzMimeOut,
            UInt32 dwReserverd
            );
    }
}


回答2:

I think what you want to do is to have one input stream, multiple output streams and then copy the input to all the outputs, something like:

Stream input;
IList<Stream> outputs;

byte[] buffer = new byte[BufferSize];
int read;

while ((read = input.Read(buffer, 0, buffer.Length)) != 0)
{
    foreach (var output in outputs)
    {
        output.Write(buffer, 0, read);
    }
}

Those output streams don't have to be normal streams, they could be special streams that for example just calculate the length. They would need to override only the Write() method, so a custom base Stream could be useful:

public class OutputStreamBase : Stream
{
    private int length;

    public override void Flush()
    {}

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        length += count;
    }

    public override bool CanRead
    {
        get { return false; }
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override long Length
    {
        get { return length; }
    }

    public override long Position
    {
        get { return length; }
        set { throw new NotSupportedException(); }
    }
}

This stream can be used directly as the counting stream, or to easily implement the find MIME stream.



回答3:

Using Rx you could use Observable.Create to create an observable which reads the stream, then use Publish to allow multiple subscriptions to the steam without starting it yet, and then call Connect on the Published stream to get everything off and running. You could use ObserveOn and SubscribeOn for each of the different "routes" the stream data takes to determine when, where, and how each portion of the code runs, which means you could buffer the whole stream and submit it to the database all at once, do the same for the MD5, count the stream using Scan or Aggregate, but you could also have an "route" that determines the mime-type and unsubscribes early. Plus, if you needed to synchronize these elements back together, you could use CombineLatest.

This question is very interesting to me, and I wish I had the time right now to post some real code examples. Unfortunately, I do not. Hopefully this gives you an idea of what operators could be used in what configurations to accomplish what you're looking for.

Here's some psuedo-code for the non-stream reading parts...

var connectable = ReadStreamAsObservable(stream).Publish();

var mimeType = connectable.ReadMimeTypeAsObservable();
var md5      = connectable.ReadMD5AsObservable();
var record   = connectable.SubmitToDatabaseAsObservable(myDbConnection);
var length   = connectable.Aggregate(0, (acc, x) => acc + x.Bytes.Length);

var parts = Observable.CombineLatest(mimeType, md5, length, record,
  (mimeType, md5, length, record) => new {
    MimeType = mimeType,
    MD5 = md5,
    Length = length,
    Record = record
  });

var subscription = new CompositeDisposable(
  parts.Subscribe((x) => Console.WriteLine(x)),
  connectable.Connect()
  );