Async with huge data streams

2019-01-30 23:51发布

We use IEnumerables to return huge datasets from database:

public IEnumerable<Data> Read(...)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            yield return item;
        }
    }
}

Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:

public async Task<List<Data>> ReadAsync(...)
{
    var result = new List<Data>();
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(await reader.ReadAsync().ConfigureAwait(false))
        {
            // ...
            result.Add(item);
        }
    }
    return result;
}

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

5条回答
小情绪 Triste *
2楼-- · 2019-01-31 00:23

The easiest option is using TPL Dataflow. All you need to do is configure an ActionBlock that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.
I would also suggest setting a BoundedCapacity which will throttle the reader reading from the database when the processing can't handle the speed.

var block = new ActionBlock<Data>(
    data => ProcessDataAsync(data),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    });

using(var connection = new SqlConnection(...))
{
    // ...
    while(await reader.ReadAsync().ConfigureAwait(false))
    {
        // ...
       await block.SendAsync(item);
    }
}

You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.

查看更多
我想做一个坏孩纸
3楼-- · 2019-01-31 00:34

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

If you don't want to send all data to the client at once, you may consider using Reactive Extensions (Rx) (on the client) and SignalR (on both client and server) to handle this.

SignalR would allow to send data to the client asynchronously. Rx would allow to apply LINQ to the asynchronous sequence of data items as they're arriving on the client. This would however change the whole code model of you client-server application.

Example (a blog post by Samuel Jack):

Related question (if not a duplicate):

查看更多
够拽才男人
4楼-- · 2019-01-31 00:38

I think Rx is definitely the way to go in this scenario, given an observable sequence is the formal dual to an enumerable one.

As mentioned in a previous answer you could re-write your sequence as an observable from scratch, but there are also a couple of ways to keep writing your iterator blocks but then just unwind them asynchronously.

1) Just convert the enumerable to an observable like so:

using System.Reactive.Linq;
using System.Reactive.Concurrency;

var enumerable = Enumerable.Range(10);
var observable = enumerable.ToObservable();
var subscription = observable.Subscribe(x => Console.WriteLine(x));

This will make your enumerable behave like an observable by pushing its notifications into any downstream observers. In this case, when Subscribe is called, it will synchronously block until all data has been processed. If you want it to be fully asynchronous, you can set it to a different thread, by using:

var observable = enumerable.ToObservable().SubscribeOn(NewThreadScheduler.Default);

Now the unwinding of the enumerable will be done in a new thread and the subscribe method will return immediately.

2) Unwind the enumerable using another asynchronous event source:

var enumerable = Enumerable.Range(10);
var observable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                           .Zip(enumerable, (t, x) => x);
var subscription = observable.Subscribe(x => Console.WriteLine(x));

In this case, I've setup a timer to fire every second and whenever it fires it moves the iterator forward. Now the timer could be easily replaced by any event source to control exactly when the iterator moves forward.

I find myself enjoying the syntax and semantics of iterator blocks (e.g. what happens with try/finally blocks and dispose), so I use these designs occasionally even when designing asynchronous operations.

查看更多
狗以群分
5楼-- · 2019-01-31 00:43

Most of the time when dealing with async/await methods, I find it easier to turn the problem around, and use functions (Func<...>) or actions (Action<...>) instead of ad-hoc code, especially with IEnumerable and yield.

In other words, when I think "async", I try to forget the old concept of function "return value" that is otherwise so obvious and that we are so familiar with.

For example, if you change you initial sync code into this (processor is the code that will ultimately do what you do with one Data item):

public void Read(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            processor(item);
        }
    }
}

Then, the async version is quite simple to write:

public async Task ReadAsync(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // note you can use connection.OpenAsync()
        // and command.ExecuteReaderAsync() here
        while(await reader.ReadAsync())
        {
            // ...
            processor(item);
        }
    }
}

If you can change your code this way, you don't need any extension or extra library or IAsyncEnumerable stuff.

查看更多
走好不送
6楼-- · 2019-01-31 00:44

As some of the other posters have mentioned this can be implemented with Rx. With Rx the function will return an IObservable<Data> which can be subscribed to and it pushes data to the subscriber as it becomes available. IObservable also supports LINQ and adds some extension methods of its own.

Update

I added a couple of generic helper methods to make the usage of the reader reusable as well as support for cancellation.

public static class ObservableEx
    {
        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc)
        {
            return CreateFromSqlCommand(connectionString, command, readDataFunc, CancellationToken.None);
        }

        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc, CancellationToken cancellationToken)
        {
            return Observable.Create<T>(
                async o =>
                {
                    SqlDataReader reader = null;

                    try
                    {                        
                        using (var conn = new SqlConnection(connectionString))
                        using (var cmd = new SqlCommand(command, conn))
                        {
                            await conn.OpenAsync(cancellationToken);
                            reader = await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection, cancellationToken);

                            while (await reader.ReadAsync(cancellationToken))
                            {
                                var data = await readDataFunc(reader);
                                o.OnNext(data);                                
                            }

                            o.OnCompleted();
                        }
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }

                    return reader;
                });
        }
    }

The implementation of ReadData is now greatly simplified.

     private static IObservable<Data> ReadData()
    {
        return ObservableEx.CreateFromSqlCommand(connectionString, "select * from Data", async r =>
        {
            return await Task.FromResult(new Data()); // sample code to read from reader.
        });
    }

Usage

You can subscribe to the Observable by giving it an IObserver but there is also overloads that take lambdas. As data becomes available the OnNext callback gets called. If there is an exception, the OnError callback gets called. Finally, if there is no more data the OnCompleted callback gets called.

If you want to cancel the observable, simply dispose of the subscription.

void Main()
{
   // This is an asyncrhonous call, it returns straight away
    var subscription = ReadData()
        .Skip(5)                        // Skip first 5 entries, supports LINQ               
        .Delay(TimeSpan.FromSeconds(1)) // Rx operator to delay sequence 1 second
        .Subscribe(x =>
    {
        // Callback when a new Data is read
        // do something with x of type Data
    },
    e =>
    {
        // Optional callback for when an error occurs
    },
    () =>
    {
        //Optional callback for when the sequenc is complete
    }
    );

    // Dispose subscription when finished
    subscription.Dispose();

    Console.ReadKey();
}
查看更多
登录 后发表回答