We use IEnumerables to return huge datasets from database:
public IEnumerable<Data> Read(...)
{
using(var connection = new SqlConnection(...))
{
// ...
while(reader.Read())
{
// ...
yield return item;
}
}
}
Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:
public async Task<List<Data>> ReadAsync(...)
{
var result = new List<Data>();
using(var connection = new SqlConnection(...))
{
// ...
while(await reader.ReadAsync().ConfigureAwait(false))
{
// ...
result.Add(item);
}
}
return result;
}
This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.
The easiest option is using
TPL Dataflow
. All you need to do is configure anActionBlock
that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.I would also suggest setting a
BoundedCapacity
which will throttle the reader reading from the database when the processing can't handle the speed.You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.
If you don't want to send all data to the client at once, you may consider using
Reactive Extensions (Rx)
(on the client) andSignalR
(on both client and server) to handle this.SignalR
would allow to send data to the client asynchronously.Rx
would allow to apply LINQ to the asynchronous sequence of data items as they're arriving on the client. This would however change the whole code model of you client-server application.Example (a blog post by Samuel Jack):
Related question (if not a duplicate):
I think Rx is definitely the way to go in this scenario, given an observable sequence is the formal dual to an enumerable one.
As mentioned in a previous answer you could re-write your sequence as an observable from scratch, but there are also a couple of ways to keep writing your iterator blocks but then just unwind them asynchronously.
1) Just convert the enumerable to an observable like so:
This will make your enumerable behave like an observable by pushing its notifications into any downstream observers. In this case, when Subscribe is called, it will synchronously block until all data has been processed. If you want it to be fully asynchronous, you can set it to a different thread, by using:
Now the unwinding of the enumerable will be done in a new thread and the subscribe method will return immediately.
2) Unwind the enumerable using another asynchronous event source:
In this case, I've setup a timer to fire every second and whenever it fires it moves the iterator forward. Now the timer could be easily replaced by any event source to control exactly when the iterator moves forward.
I find myself enjoying the syntax and semantics of iterator blocks (e.g. what happens with try/finally blocks and dispose), so I use these designs occasionally even when designing asynchronous operations.
Most of the time when dealing with async/await methods, I find it easier to turn the problem around, and use functions
(Func<...>
) or actions (Action<...>
) instead of ad-hoc code, especially withIEnumerable
andyield
.In other words, when I think "async", I try to forget the old concept of function "return value" that is otherwise so obvious and that we are so familiar with.
For example, if you change you initial sync code into this (
processor
is the code that will ultimately do what you do with one Data item):Then, the async version is quite simple to write:
If you can change your code this way, you don't need any extension or extra library or IAsyncEnumerable stuff.
As some of the other posters have mentioned this can be implemented with Rx. With Rx the function will return an
IObservable<Data>
which can be subscribed to and it pushes data to the subscriber as it becomes available.IObservable
also supports LINQ and adds some extension methods of its own.Update
I added a couple of generic helper methods to make the usage of the reader reusable as well as support for cancellation.
The implementation of
ReadData
is now greatly simplified.Usage
You can subscribe to the Observable by giving it an
IObserver
but there is also overloads that take lambdas. As data becomes available theOnNext
callback gets called. If there is an exception, theOnError
callback gets called. Finally, if there is no more data theOnCompleted
callback gets called.If you want to cancel the observable, simply dispose of the subscription.