I'm learning about async/await patterns in C#. Currently I'm trying to solve a problem like this:
There is a producer (a hardware device) that generates 1000 packets per second. I need to log this data to a file.
The device only has a ReadAsync()
method to report a single packet at a time.
I need to buffer the packets and write them in the order they are generated to the file, only once a second.
Write operation should fail if the write process is not finished in time when the next batch of packets is ready to be written.
So far I have written something like below. It works but I am not sure if this is the best way to solve the problem. Any comments or suggestion? What is the best practice to approach this kind of Producer/Consumer problem where the consumer needs to aggregate the data received from the producer?
static async Task TestLogger(Device device, int seconds)
{
const int bufLength = 1000;
bool firstIteration = true;
Task writerTask = null;
using (var writer = new StreamWriter("test.log")))
{
do
{
var buffer = new byte[bufLength][];
for (int i = 0; i < bufLength; i++)
{
buffer[i] = await device.ReadAsync();
}
if (!firstIteration)
{
if (!writerTask.IsCompleted)
throw new Exception("Write Time Out!");
}
writerTask = Task.Run(() =>
{
foreach (var b in buffer)
writer.WriteLine(ToHexString(b));
});
firstIteration = false;
} while (--seconds > 0);
}
}
You could use the following idea, provided the criteria for flush is the number of packets (up to 1000). I did not test it. It makes use of Stephen Cleary's AsyncProducerConsumerQueue<T>
featured in this question.
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;
// producer
async Task ReceiveAsync(CancellationToken token)
{
while (true)
{
var list = new List<byte>();
while (true)
{
token.ThrowIfCancellationRequested(token);
var packet = await _device.ReadAsync(token);
list.Add(packet);
if (list.Count == 1000)
break;
}
// push next batch
await _queue.EnqueueAsync(list.ToArray(), token);
}
}
// consumer
async Task LogAsync(CancellationToken token)
{
Task previousFlush = Task.FromResult(0);
CancellationTokenSource cts = null;
while (true)
{
token.ThrowIfCancellationRequested(token);
// get next batch
var nextBatch = await _queue.DequeueAsync(token);
if (!previousFlush.IsCompleted)
{
cts.Cancel(); // cancel the previous flush if not ready
throw new Exception("failed to flush on time.");
}
await previousFlush; // it's completed, observe for any errors
// start flushing
cts = CancellationTokenSource.CreateLinkedTokenSource(token);
previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
}
}
If you don't want to fail the logger but rather prefer to cancel the flush and proceed to the next batch, you can do so with a minimal change to this code.
In response to @l3arnon comment:
- A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and
tested than .Net's TPL Dataflow. 4. You await previousFlush for errors
just after you throw an exception which makes that line redundant.
etc. In short: I think the possible added value doesn't justify this
very complicated solution.
- "A packet is not a byte, it's byte[]" - A packet is a byte, this is obvious from the OP's code:
buffer[i] = await device.ReadAsync()
. Then, a batch of packets is byte[]
.
"You haven't used the OP's ToHexString." - The goal was to show how to use Stream.WriteAsync
which natively accepts a cancellation token, instead of WriteLineAsync
which doesn't allow cancellation. It's trivial to use ToHexString
with Stream.WriteAsync
and still take advantage of cancellation support:
var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) +
Environment.NewLine);
_stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
"AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow" - I don't think this is a determined fact. However, if the OP is concerned about it, he can use regular BlockingCollection
, which doesn't block the producer thread. It's OK to block the consumer thread while waiting for the next batch, because writing is done in parallel. As opposed to this, your TPL Dataflow version carries one redundant CPU and lock intensive operation: moving data from producer pipeline to writer pipleline with logAction.Post(packet)
, byte by byte. My code doesn't do that.
"You await previousFlush for errors just after you throw an exception which makes that line redundant." - This line is not redundant. Perhaps, you're missing this point: previousFlush.IsCompleted
can be true
when previousFlush.IsFaulted
or previousFlush.IsCancelled
is also true
. So, await previousFlush
is relevant there to observe any errors on the completed tasks (e.g., a write failure), which otherwise will be lost.
A better approach IMHO would be to have 2 "workers", a producer and a consumer. The producer reads from the device and simply fills a list. The consumer "wakes up" every second and writes the batch to a file.
List<byte[]> _data = new List<byte[]>();
async Task Producer(Device device)
{
while (true)
{
_data.Add(await device.ReadAsync());
}
}
async Task Consumer(Device device)
{
using (var writer = new StreamWriter("test.log")))
{
while (true)
{
Stopwatch watch = Stopwatch.StartNew();
var batch = _data;
_data = new List<byte[]>();
foreach (var packet in batch)
{
writer.WriteLine(ToHexString(packet));
if (watch.Elapsed >= TimeSpan.FromSeconds(1))
{
throw new Exception("Write Time Out!");
}
}
await Task.Delay(TimeSpan.FromSeconds(1) - watch.Elapsed);
}
}
}
The while (true)
should probably be replaced by a system wide cancellation token.
Assuming you can batch by amount (1000) instead of time (1 second), the simplest solution is probably using TPL Dataflow
's BatchBlock
which automatically batches a flow of items by size:
async Task TestLogger(Device device, int seconds)
{
var writer = new StreamWriter("test.log");
var batch = new BatchBlock<byte[]>(1000);
var logAction = new ActionBlock<byte[]>(
packet =>
{
return writer.WriteLineAsync(ToHexString(packet));
});
ActionBlock<byte[]> transferAction;
transferAction = new ActionBlock<byte[][]>(
bytes =>
{
foreach (var packet in bytes)
{
if (transferAction.InputCount > 0)
{
return; // or throw new Exception("Write Time Out!");
}
logAction.Post(packet);
}
}
);
batch.LinkTo(transferAction);
logAction.Completion.ContinueWith(_ => writer.Dispose());
while (true)
{
batch.Post(await device.ReadAsync());
}
}