I need to parse a stream of serial data coming from a test instrument, and this seems to be an excellent application for Reactive Extensions.
The protocol is very simple...each "packet" is a single letter followed by numeric digits. The number of numeric digits is fixed for each packet type, but can vary between packet types. e.g.
...A1234B123456C12...
I am trying to break this up into an Observable of Strings, e.g.
"A1234" "B123456" "C12" ...
Thought this would be simple, but don't see the obvious way to approach this (I have some experience with LINQ, but am new to Rx).
Here's the code I have so far, that produces Observable of chars from the serial port's SerialDataReceived event.
var serialData = Observable
.FromEventPattern<SerialDataReceivedEventArgs>(SerialPortMain, "DataReceived")
.SelectMany(_ =>
{
int dataLength = SerialPortMain.BytesToRead;
byte[] data = new byte[dataLength];
int nbrDataRead = SerialPortMain.Read(data, 0, dataLength);
if (nbrDataRead == 0)
return new char[0];
var chars = Encoding.ASCII.GetChars(data);
return chars;
});
How can I transform serialData to Observable of String, where each string is a packet?
Here's a slightly shorter method, in the same style as James' first solution, with a similar helper method:
public static bool IsCompletePacket(string s)
{
switch (s[0])
{
case 'A':
return s.Length == 5;
case 'B':
return s.Length == 6;
case 'C':
return s.Length == 7;
default:
throw new ArgumentException("Packet must begin with a letter");
}
}
The code is then:
var packets = chars
.Scan(string.Empty, (prev, cur) => char.IsLetter(cur) ? cur.ToString() : prev + cur)
.Where(IsCompletePacket);
The Scan
part builds up strings that terminate on a letter e.g.:
A
A1
A12
A123
...
The Where
then just selects those that are the correct length. Essentially it just removes the Tuple from James' and uses the string length instead.
Surprisingly fiddly! I've solved this a couple of ways:
// helper method to get the packet length
public int GetPacketLength(char c)
{
switch(c)
{
case 'A':
return 5;
case 'B':
return 6;
case 'C':
return 7;
default:
throw new Exception("Unknown packet code");
}
}
Then we can do this:
// chars is a test IObservable<char>
string[] messages = { "A1234", "B12345", "C123456" };
var serialPort = Enumerable.Range(1, 10).ToObservable();
var chars = serialPort.SelectMany((_, i) => messages[i % 3]);
var packets = chars.Scan(
Tuple.Create(string.Empty, -1),
(acc, c) =>
Char.IsLetter(c)
? Tuple.Create(c.ToString(), GetPacketLength(c) - 1)
: Tuple.Create(acc.Item1 + c, acc.Item2 - 1))
.Where(acc => acc.Item2 == 0)
.Select(acc => acc.Item1)
.Subscribe(Console.WriteLine);
What it does is this:
- The
Scan
builds up each packet and pairs it with the number of characters remaining in the packet: e.g. ("A",4) ("A1",3) ("A12",2) ("A123",1) ("A1234",0) ("B",5) ...
- Then we know the pairs with 0 characters remaining are the ones we need, so we use
Where
to filter the rest out and Select
the result out of the pair
Alternative
Here's another approach, in a less functional style. I like the code above from a style point of view, but the code below is a bit more efficient with memory - in the unlikely event that it's going to make a difference.
public static class ObservableExtensions
{
private const int MaxPacketLength = 7;
private static Dictionary<char, int> PacketLengthTable =
new Dictionary<char, int> { {'A', 5}, {'B', 6}, {'C', 7 } };
public static IObservable<string> GetPackets(this IObservable<char> source)
{
return Observable.Create<string>(o =>
{
var currentPacketLength = 0;
var buffer = new char[MaxPacketLength];
var index = -1;
return source.Subscribe(
c => {
if (Char.IsLetter(c))
{
currentPacketLength = PacketLengthTable[c];
buffer[0] = c;
index = 0;
}
else if(index >= 0)
{
index++;
buffer[index] = c;
}
if (index == currentPacketLength - 1)
{
o.OnNext(new string(buffer,0, currentPacketLength));
index = -1;
}
},
o.OnError,
o.OnCompleted);
});
}
}
And it can be used like so:
// chars is a test IObservable<char>
string[] messages = { "A1234", "B12345", "C123456" };
var serialPort = Enumerable.Range(1, 10).ToObservable();
var chars = serialPort.SelectMany((_, i) => messages[i % 3]);
var packets = chars.GetPackets().Subscribe(Console.WriteLine);
Here's my take on the problem.
static IObservable<string> Packets(IObservable<char> source)
{
return Observable.Create<string>(observer =>
{
var packet = new List<char>();
Action emitPacket = () =>
{
if (packet.Count > 0)
{
observer.OnNext(new string(packet.ToArray()));
packet.Clear();
}
};
return source.Subscribe(
c =>
{
if (char.IsLetter(c))
{
emitPacket();
}
packet.Add(c);
},
observer.OnError,
() =>
{
emitPacket();
observer.OnCompleted();
});
});
}
If the input is the characters A425B90C2DX812
the output is A425
, B90
, C2
, D
, X812
.
Notice you don't need to specify packet lengths or packet types (starting letters) up front.
You can implement the same method using a more general-purpose extension method:
static IObservable<IList<T>> GroupSequential<T>(
this IObservable<T> source, Predicate<T> isFirst)
{
return Observable.Create<T>(observer =>
{
var group = new List<T>();
Action emitGroup = () =>
{
if (group.Count > 0)
{
observer.OnNext(group.ToList());
group.Clear();
}
};
return source.Subscribe(
item =>
{
if (isFirst(item))
{
emitGroup();
}
group.Add(item);
},
observer.OnError,
() =>
{
emitGroup();
observer.OnCompleted();
});
});
}
The implementation of Packets
is then just:
static IObservable<string> Packets(IObservable<char> source)
{
return source
.GroupSequential(char.IsLetter)
.Select(x => new string(x.ToArray()));
}