I have code which streams data down from SQL and writes it to a different store. The code is approximately this:
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}
I would like to use Reactive extensions for this purpose instead. Ideally the code would look like this:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));
However, it seems that the extension method ForEachAsync only accepts synchronous actions. Would it be possible to write an extension which would accept an async action?
Not directly.
Rx subscriptions are necessarily synchronous because Rx is a push-based system. When a data item arrives, it travels through your query until it hits the final subscription - which in this case is to execute an
Action
.The
await
-able methods provided by Rx areawait
ing the sequence itself - i.e.,ForEachAsync
is asynchronous in terms of the sequence (you are asynchronously waiting for the sequence to complete), but the subscription withinForEachAsync
(the action taken for each element) must still be synchronous.In order to do a sync-to-async transition in your data pipeline, you'll need to have a buffer. An Rx subscription can (synchronously) add to the buffer as a producer while an asynchronous consumer is retrieving items and processing them. So, you'd need a producer/consumer queue that supports both synchronous and asynchronous operations.
The various block types in TPL Dataflow can satisfy this need. Something like this should suffice:
Note that there is no backpressure; as quickly as
StreamDataFromSql
can push data, it'll be buffered and stored in the incoming queue of theActionBlock
. Depending on the size and type of data, this can quickly use a lot of memory.The correct thing to do is to use Reactive Extensions properly to get this done - so start from the point that you create the connection right up until you write your data.
Here's how:
I couldn't test the code, but it should work. This code assumes that
WriteDataAsync
can take aIList<MyData>
too. If it doesn't just drop in a.ToList()
.Here is the source code for ForEachAsync and an article on the ToEnumerable and AsObservable method
We can make a wrapper around the ForEachAsync that will await a Task-returning function:
Example usage: