Throttle but discard results if they come too late

2019-07-11 07:53发布

问题:

I'm writing a UI where the user can type in a search term and a list get continuously updated offering suggestions.

My first though was that the Rx primitive Throttle was a perfect match but it gets me half there.

The suggestions take a while to fetch so I get them asynchronously on not on the UI thread.

The problem is that I want to discard/skip/throw away a result if the user types af the throttle time span again.

For example:

  • Time starts and the user presses a key : 0ms
  • The throttle is set to 100ms.
  • The fetch takes 200ms.
  • At 150ms the user pressed another key

Now with Throttle the first fetch will still go ahead an populate the gui suggestion list. What I like to learn is how can I cancel that first fetch as it is not relevant anymore? Only the second keypress should trigger an update to the gui.

Here is what I tried

(I use ReactiveUI but the Q is about Rx)

public IEnumerable<usp_getOpdrachtgevers_Result> Results { get; set; } // [Reactive] pu

public SearchOpdrachtgeverVM()
{

    this.WhenAnyValue(x => x.FirstName,
                      x => x.LastName
        )
        .Throttle(TimeSpan.FromMilliseconds(200))
        .Subscribe(async vm => Results = await PopulateGrid());
}

private async Task<IEnumerable<usp_getOpdrachtgevers_Result>> PopulateGrid()
{

    return await Task.Run(
             () => _opdrachtgeversCache
                         .Where(x =>
                                x.vNaam.Contains(FirstName)
                                && x.vLastName.Contains(LastName)
                         )

             );

}

回答1:

If you turn your async Task into an Observable, this looks like a classic use for Switch:

this.WhenAnyValue(x => x.FirstName,
                  x => x.LastName
    )
    .Throttle(TimeSpan.FromMilliseconds(100)) 
    .Select(l => PopulateGrid().ToObservable())
    .Switch()
    .Subscribe(vm => Results = vm);

Throttle should be used to suppress calls while the user is typing. So adjust that TimeSpan as you like.



回答2:

If I understand what you want correctly, this can be done in a quite straight forward manner and clean if you refactor your code slightly.

Firstly, make the first name and last name triggers into observables. In the code below, I have used subjects but it is better if you're able to use static Observable methods to 'convert' them into observables; e.g. Observable.FromEvent.

Then turn the code to fetch results into an observable. In the code below I have used Observable.Create to return a stream of IEnumerable<string>.

Finally, you can use the Switch operator to subscribe to each new GetResults call and cancel the previous call to GetResults.

Sounds complicated but the code is quite straight forward:

private Subject<string> _firstName = new Subject<string>();
private Subject<string> _lastName = new Subject<string>();

private Task<IEnumerable<string>> FetchResults(string firstName, string lastName, CancellationToken cancellationToken)
{
    // Fetch the results, respecting the cancellation token at the earliest opportunity
    return Task.FromResult(Enumerable.Empty<string>());
}

private IObservable<IEnumerable<string>> GetResults(string firstName, string lastName)
{
    return Observable.Create<IEnumerable<string>>(
        async observer =>
        {
            // Use a cancellation disposable to provide a cancellation token the the asynchronous method
            // When the subscription to this observable is disposed, the cancellation token will be cancelled.
            CancellationDisposable disposable = new CancellationDisposable();

            IEnumerable<string> results = await FetchResults(firstName, lastName, disposable.Token);

            if (!disposable.IsDisposed)
            {
                observer.OnNext(results);
                observer.OnCompleted();
            }

            return disposable;
        }
    );
}

private void UpdateGrid(IEnumerable<string> results)
{
    // Do whatever
}

private IDisposable ShouldUpdateGridWhenFirstOrLastNameChanges()
{
    return Observable
        // Whenever the first or last name changes, create a tuple of the first and last name
        .CombineLatest(_firstName, _lastName, (firstName, lastName) => new { FirstName = firstName, LastName = lastName })
        // Throttle these tuples so we only get a value after it has settled for 100ms
        .Throttle(TimeSpan.FromMilliseconds(100))
        // Select the results as an observable
        .Select(tuple => GetResults(tuple.FirstName, tuple.LastName))
        // Subscribe to the new results and cancel any previous subscription
        .Switch()
        // Use the new results to update the grid
        .Subscribe(UpdateGrid);
}

Quick tip: you should really pass an explicit scheduler into the Throttle so that you can effectively unit test this code using the TestScheduler.

Hope it helps.