How to use Reactive Extensions to parse a stream o

2019-04-25 12:03发布

问题:

I need to parse a stream of serial data coming from a test instrument, and this seems to be an excellent application for Reactive Extensions.

The protocol is very simple...each "packet" is a single letter followed by numeric digits. The number of numeric digits is fixed for each packet type, but can vary between packet types. e.g.

...A1234B123456C12...

I am trying to break this up into an Observable of Strings, e.g.

"A1234" "B123456" "C12" ...

Thought this would be simple, but don't see the obvious way to approach this (I have some experience with LINQ, but am new to Rx).

Here's the code I have so far, that produces Observable of chars from the serial port's SerialDataReceived event.

        var serialData = Observable
                            .FromEventPattern<SerialDataReceivedEventArgs>(SerialPortMain, "DataReceived")
                            .SelectMany(_ => 
                            {
                                int dataLength = SerialPortMain.BytesToRead;
                                byte[] data = new byte[dataLength];
                                int nbrDataRead = SerialPortMain.Read(data, 0, dataLength);

                                if (nbrDataRead == 0)
                                    return  new char[0]; 

                                 var chars = Encoding.ASCII.GetChars(data);

                                 return chars; 
                            });

How can I transform serialData to Observable of String, where each string is a packet?

回答1:

Here's a slightly shorter method, in the same style as James' first solution, with a similar helper method:

public static bool IsCompletePacket(string s)
{
    switch (s[0])
    {
        case 'A':
            return s.Length == 5;
        case 'B':
            return s.Length == 6;
        case 'C':
            return s.Length == 7;
        default:
            throw new ArgumentException("Packet must begin with a letter");
    }
}

The code is then:

var packets = chars
    .Scan(string.Empty, (prev, cur) => char.IsLetter(cur) ? cur.ToString() : prev + cur)
    .Where(IsCompletePacket);

The Scan part builds up strings that terminate on a letter e.g.:

A
A1
A12
A123
...

The Where then just selects those that are the correct length. Essentially it just removes the Tuple from James' and uses the string length instead.



回答2:

Surprisingly fiddly! I've solved this a couple of ways:

// helper method to get the packet length
public int GetPacketLength(char c)
{
    switch(c)
    {
        case 'A':
            return 5;
        case 'B':
            return 6;
        case 'C':
            return 7;
        default:
            throw new Exception("Unknown packet code");
    }
}

Then we can do this:

// chars is a test IObservable<char> 
string[] messages = { "A1234", "B12345", "C123456" };
var serialPort = Enumerable.Range(1, 10).ToObservable();
var chars = serialPort.SelectMany((_, i) => messages[i % 3]);

var packets = chars.Scan(
    Tuple.Create(string.Empty, -1),
    (acc, c) =>
        Char.IsLetter(c)
            ? Tuple.Create(c.ToString(), GetPacketLength(c) - 1)
            : Tuple.Create(acc.Item1 + c, acc.Item2 - 1))
    .Where(acc => acc.Item2 == 0)
    .Select(acc => acc.Item1)
    .Subscribe(Console.WriteLine);

What it does is this:

  • The Scan builds up each packet and pairs it with the number of characters remaining in the packet: e.g. ("A",4) ("A1",3) ("A12",2) ("A123",1) ("A1234",0) ("B",5) ...
  • Then we know the pairs with 0 characters remaining are the ones we need, so we use Where to filter the rest out and Select the result out of the pair

Alternative

Here's another approach, in a less functional style. I like the code above from a style point of view, but the code below is a bit more efficient with memory - in the unlikely event that it's going to make a difference.

public static class ObservableExtensions
{
    private const int MaxPacketLength = 7;
    private static Dictionary<char, int> PacketLengthTable =
        new Dictionary<char, int> { {'A', 5}, {'B', 6}, {'C', 7 } };

    public static IObservable<string> GetPackets(this IObservable<char> source)
    {
        return Observable.Create<string>(o =>
        {
            var currentPacketLength = 0;
            var buffer = new char[MaxPacketLength];
            var index = -1;
            return source.Subscribe(
                c => {
                    if (Char.IsLetter(c))
                    {
                        currentPacketLength = PacketLengthTable[c];
                        buffer[0] = c;
                        index = 0;
                    }
                    else if(index >= 0)
                    {
                        index++;
                        buffer[index] = c;
                    }
                    if (index == currentPacketLength - 1)
                    {
                        o.OnNext(new string(buffer,0, currentPacketLength));
                        index = -1;
                    }
                },
                o.OnError,
                o.OnCompleted);
        });
    }
}

And it can be used like so:

// chars is a test IObservable<char> 
string[] messages = { "A1234", "B12345", "C123456" };
var serialPort = Enumerable.Range(1, 10).ToObservable();
var chars = serialPort.SelectMany((_, i) => messages[i % 3]);

var packets = chars.GetPackets().Subscribe(Console.WriteLine);


回答3:

Here's my take on the problem.

static IObservable<string> Packets(IObservable<char> source)
{
    return Observable.Create<string>(observer =>
    {
        var packet = new List<char>();
        Action emitPacket = () =>
        {
            if (packet.Count > 0)
            {
                observer.OnNext(new string(packet.ToArray()));
                packet.Clear();
            }
        };
        return source.Subscribe(
            c =>
            {
                if (char.IsLetter(c))
                {
                    emitPacket();
                }
                packet.Add(c);
            },
            observer.OnError,
            () =>
            {
                emitPacket();
                observer.OnCompleted();
            });
    });
}

If the input is the characters A425B90C2DX812 the output is A425, B90, C2, D, X812.

Notice you don't need to specify packet lengths or packet types (starting letters) up front.

You can implement the same method using a more general-purpose extension method:

static IObservable<IList<T>> GroupSequential<T>(
    this IObservable<T> source, Predicate<T> isFirst)
{
    return Observable.Create<T>(observer =>
    {
        var group = new List<T>();
        Action emitGroup = () =>
        {
            if (group.Count > 0)
            {
                observer.OnNext(group.ToList());
                group.Clear();
            }
        };
        return source.Subscribe(
            item =>
            {
                if (isFirst(item))
                {
                    emitGroup();
                }
                group.Add(item);
            },
            observer.OnError,
            () =>
            {
                emitGroup();
                observer.OnCompleted();
            });
    });
}

The implementation of Packets is then just:

static IObservable<string> Packets(IObservable<char> source)
{
    return source
        .GroupSequential(char.IsLetter)
        .Select(x => new string(x.ToArray()));
}