Buffer data from database cursor while keeping UI

2019-02-27 20:23发布

问题:

I have a database catalog that is populated, and a cursor that can be used to retrieve objects. This catalog can obviously be very large, and what I'd like to do is use ReactiveUI to buffer the data in, while keeping the UI data-bound and responsive. I followed the steps here to translate my IEnumerable into an IObservable, as shown here:

public class CatalogService
{
   ...

   public IObservable<DbObject> DataSource
   {
        get
        {
            return Observable.Create<DbObject>(obs =>
            {
                var cursor = Database.Instance.GetAllObjects();
                var status = cursor.MoveToFirst();

                while (status == DbStatus.OK)
                {
                    var dbObject= Db.Create(cursor);
                    obs.OnNext(dbObject);

                    status = cursor.MoveToNext();
                }

                obs.OnCompleted();

                return Disposable.Empty;
            });
        }
    }
}

In my view class (specifically, the Loaded event), I am subscribing to the data-source and using the buffer method in hopes of keeping the UI responsive.

    public ObservableCollection<DbObject> DbObjects { get; set; }

    private async void OnLoad(object sender, RoutedEventArgs e)
    {
        var observableData = CatalogService.Instance.DataSource.Publish();
        var chunked = observableData.Buffer(TimeSpan.FromMilliseconds(100));
        var dispatcherObs = chunked.ObserveOnDispatcher(DispatcherPriority.Background);
        dispatcherObs.Subscribe(dbObjects =>
        {
            foreach (var dbObject in dbObjects)
            {
                DbObjects.Add(dbObject);
            }
        });

        await Task.Run(() => observableData.Connect());
        await dispatcherObs.ToTask();
    }

The result is unfortunately quite the opposite. When my view control (which contains a simple ListBox data-bound to the DbObjects property) loads, it does not show any data until the entire catalog has been enumerated. Only then does the UI refresh.

I am new to ReactiveUI, but I am sure that it is capable for the task at hand. Does anyone have any suggestions or pointers if I am using it incorrectly?

回答1:

Pending further information, my guess is you've got possibly several zero-length buffers depending on how long the DB query takes, followed by exactly one non-zero length buffer containing all the results. You're probably better off limiting buffer size by length as well as time.

EDIT - I just wanted to do an analysis of the various threads involved in the original implementation. I don't agree with Paul's analysis, I do not believe the UI Thread is blocked because of the DB query. I believe it's blocked due to large numbers of results being buffered.

Charlie - please, can you time the DB query in code (not with the debugger) and dump the buffer lengths you are getting too.

I will annotate the code to show the order of all three threads involved:

First of all, outside of the provided code, I am assuming a call is made to OnLoad via the Loaded event.

(1) - UI Thread calls OnLoad

public ObservableCollection<DbObject> DbObjects { get; set; }

private async void OnLoad(object sender, RoutedEventArgs e)
{
    // (2) UI Thread enters OnLoad

    var observableData = CatalogService.Instance.DataSource.Publish();

    var chunked = observableData
        // (6) Thread A OnNext passes into Buffer
        .Buffer(TimeSpan.FromMilliseconds(100));
        // (7) Thread B, threadpool thread used by Buffer to run timer 

    var dispatcherObs = chunked
        // (8) Thread B still
        .ObserveOnDispatcher(DispatcherPriority.Background);
        // (9) Non blocking OnNexts back to UI Thread

    dispatcherObs.Subscribe(dbObjects =>
    {
        // (10) UI Thread receives buffered dbObjects            
        foreach (var dbObject in dbObjects)
        {
            // (11) UI Thread hurting while all these images are
            // stuffed in the collection in one go - This is the issue I bet.
            DbObjects.Add(dbObject);
        }
    });

    await Task.Run(() =>
    {
        // (3) Thread A - a threadpool thread,
        // triggers subscription to DataSource
        // UI Thread is *NOT BLOCKED* due to await
        observableData.Connect()
    });
    // (13) UI Thread - Dispatcher call back here at end of Create call
    // BUT UI THREAD WAS NOT BLOCKED!!!

    // (14) UI Thread - This task will be already completed
    // It is causing a second subscription to the already completed published observable
    await dispatcherObs.ToTask();


}

public class CatalogService
{
   ...

   public IObservable<DbObject> DataSource
   {
        get
        {
            return Observable.Create<DbObject>(obs =>
            {
                // (4) Thread A runs Database query synchronously
                var cursor = Database.Instance.GetAllObjects();
                var status = cursor.MoveToFirst();

                while (status == DbStatus.OK)
                {
                    var dbObject= Db.Create(cursor);
                    // (5) Thread A call OnNext
                    obs.OnNext(dbObject);

                    status = cursor.MoveToNext();
                }

                obs.OnCompleted();
                // (12) Thread A finally completes subscription due to Connect()
                return Disposable.Empty;
            });
        }
    }
}

I think the issue is a large buffer unloading tons of results into the ObservableCollection in one go, creating a ton of work for the listbox.



回答2:

Your problem is here:

           while (status == DbStatus.OK)
            {
                var dbObject= Db.Create(cursor);
                obs.OnNext(dbObject);

                status = cursor.MoveToNext();
            }

That loop runs synchronously as soon as someone subscribes, in a blocking way. Since you're creating the subscription on the UI thread (at the time you call Connect), it will run the entire thing on the UI thread. Change it to:

return Observable.Create<DbObject>(obs =>
{
    Observable.Start(() => {
        var cursor = Database.Instance.GetAllObjects();
        var status = cursor.MoveToFirst();

        while (status == DbStatus.OK)
        {
            var dbObject= Db.Create(cursor);
            obs.OnNext(dbObject);

            status = cursor.MoveToNext();
        }

        obs.OnCompleted();
    }, RxApp.TaskPoolScheduler);

    return Disposable.Empty;
});