Asynchronous iterator Task>

2019-01-09 08:37发布

问题:

I’m trying to implement an asynchronous function that returns an iterator. The idea is the following:

    private async Task<IEnumerable<char>> TestAsync(string testString)
    {
        foreach (char c in testString.ToCharArray())
        {
            // do other work
            yield return c;
        }
    }

However, there is an error message that the function cannot be an iterator block because Task<IEnumerable<char>> is not an iterator interface type. Is there a solution?

回答1:

It sounds like what you may really be looking for is something like IObservable<T>, which is sort of like a push-based asynchronous IEnumerable<T>. See Reactive Extensions, a.k.a. Rx from Microsoft Open Technologies (code licensed under Apache-2.0) (no affiliation) for a huge host of methods that work with IObservable<T> to make it work like LINQ-to-Objects and more.

The problem with IEnumerable<T> is that there's nothing that really makes the enumeration itself asynchronous. If you don't want to add a dependency on Rx (which is really what makes IObservable<T> shine), this alternative might work for you:

public async Task<IEnumerable<char>> TestAsync(string testString)
{
    return GetChars(testString);
}

private static IEnumerable<char> GetChars(string testString)
{
    foreach (char c in testString.ToCharArray())
    {
        // do other work
        yield return c;
    }
}

though I'd like to point out that without knowing what's actually being done asynchronously, there may be a much better way to accomplish your goals. None of the code you posted will actually do anything asynchronously, and I don't really know if anything in // do other work is asynchronous (in which case, this isn't a solution to your underlying problem though it will make your code compile).



回答2:

To elaborate on previous answers, you can use Reactive Extensions' Observable.Create<TResult> family of methods to do exactly what you want.

Here's an example:

var observable = Observable.Create<char>(async (observer, cancel) =>
{
    for (var i = 0; !cancel.IsCancellationRequested && i < 100; i++)
    {
        observer.OnNext(await GetCharAsync());
    }
});

Here's how you can use it in LINQPad, for example:

// Create a disposable that keeps the query running.
// This is necessary, since the observable is 100% async.
var end = Util.KeepRunning();

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());


回答3:

A more "batteries-included" implementation of this kind of thing, including language support, is currently (at the time of writing) being considered for inclusion in C# 8.0.

As I understand it, this is subject to change at any time.