Using Rx Framework for async calls using the void

2019-01-26 22:21发布

I've seen tons of examples on how to use the Observable.FromAsyncPattern() in the Rx Framework to simplify Async calls, but I'm using an interface that doesn't use the standard Async pattern of IAsyncResult BeginXXX/EndXXX(IAsyncResult), so this doesn't work for me.

The library I'm working with exposes async functions with a callback patter:

void GetAllObjects(Action<List<Object>> callback)

In an ideal world I'd like to turn this:

var isLoadingUsers = true;
var isLoadingSystems = true;
var isLoadingCustomers = true;
var isLoadingRules = true;

mClient.GetAllUsers(UsersCallback);
mClient.GetAllCustomers(CustomersCallback);
mClient.GetAllRules(RulesCallback);

// set the IsLoadingXXX variables to false in callbacks
// once all are false

mClient.GetAllSystems(SystemsCallback);

into something like this:

var o = Observable.ForkJoin(
                     Observable.Start(GetAllUsers()),
                     Observable.Start(GetAllCustomers()),
                     Observable.Start(GetAllRules())
                    ).Finally(() => GetAllSystems);

How would one go about turning that pattern into something that returns an IObservable?

3条回答
一纸荒年 Trace。
2楼-- · 2019-01-26 22:41
Func<IObservable<TRet>> FromListCallbackPattern(Action<Action<List<TRet>>> function)
{
    return () => {
        // We use a ReplaySubject so that if people subscribe *after* the
        // real method finishes, they'll still get all the items
        ret = new ReplaySubject<TRet>();

        function((list) => {
            // We're going to "rebroadcast" the list onto the Subject
            // This isn't the most Rx'iest way to do this, but it is the most
            // comprehensible :)
            foreach(var v in list) {
                ret.OnNext(v);
            }
            ret.OnCompleted();
        });

        return ret;
    };
}

Now, you can do something like:

var getAllUsers = FromListCallbackPattern(mClient.GetAllUsers);
getAllUsers().Subscribe(x => /* ... */);
查看更多
叛逆
3楼-- · 2019-01-26 22:47

Try Observable.Create(), perhaps something like this:

public IObservable<Object> ObserveAllObjects()
{
    return Observable.Create<Object>(
        observer =>
            () => GetAllObjects(objects => objects.ForEach(o => observer.OnNext(o))));
}
查看更多
时光不老,我们不散
4楼-- · 2019-01-26 22:51

I like Observable.Create for this, but @dahlbyk answer is incorrect(misses completion and performs the action in the unsubscribe handler). Should be something like this:

    IObservable<List<T>> FromListCallbackPattern<T>(
        Action<Action<List<T>>> listGetter)
    {
        return Observable
            .Create<List<T>>(observer =>
            {
                var subscribed = true;
                listGetter(list =>
                {
                    if (!subscribed) return;
                    observer.OnNext(list);
                    observer.OnCompleted();
                });
                return () =>
                {
                    subscribed = false;
                };
            });
    }

Also, since the originating API returns an entire list altogether, I don't see a reason to transform it to observable too early. Let the resulting observable return a list as well, and if a caller needs to flatten it, he can use .SelectMany

查看更多
登录 后发表回答