Observable.Range being repeated?

2019-07-12 04:46发布

问题:

New to Rx -- I have a sequence that appears to be functioning correctly except for the fact that it appears to repeat.

I think I'm missing something around calls to Select() or SelectMany() that triggers the range to re-evaluate.

Explanation of Code & What I'm trying to Do

  • For all numbers, loop through a method that retrieves data (paged from a database).
  • Eventually, this data will be empty (I only want to keep processing while it retrieves data
  • For each of those records retrieved, I only want to process ones that should be processed
  • Of those that should be processed, I'd like to process up to x of them in parallel (according to a setting).
  • I want to wait until the entire sequence is completed to exit the method (hence the wait call at the end).

Problem With the Code Below

  • I run the code through with a data set that I know only has 1 item.
    • So, page 0 returns 1 item, and page 1 return 0 items.
  • My expectation is that the process runs once for the one item.
  • However, I see that both page 0 and 1 are called twice and the process thus runs twice.

I think this has something to do with a call that is causing the range to re-evaluate beginning from 0, but I can't figure out what that it is.

The Code

var query = Observable.Range(0, int.MaxValue)
    .Select(pageNum =>
        {
            _etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
            return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
        })
    .TakeWhile(resProfList => resProfList.Any())
    .SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
    .Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
    .Merge(maxConcurrent: _processorSettings.ParallelProperties)
    .Do(async trackingRequests =>
    {
        await CreateRequests(trackingRequests.Result, createTrackingPayload);

        var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
        var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
        var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
            TrackingRecordRequestType.UpdateAssignmentType);

        _etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
            numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
    });

var subscription = query.Subscribe(
trackingRequests =>
{
    //Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
    _etlLogger.Info("Finished! Woohoo!");
});

await query.Wait();

回答1:

This is because you subscribe to the sequence twice. Once at query.Subscribe(...) and again at query.Wait().

Observable.Range(0, int.MaxValue) is a cold observable. Every time you subscribe to it, it will be evaluated again. You could make the observable hot by publishing it with Publish(), then subscribe to it, and then Connect() and then Wait(). This does add a risk to get a InvalidOperationException if you call Wait() after the last element is already yielded. A better alternative is LastOrDefaultAsync().

That would get you something like this:

var connectable = query.Publish();
var subscription = connectable.Subscribe(...);
subscription = new CompositeDisposable(connectable.Connect(), subscription);
await connectable.LastOrDefaultAsync();

Or you can avoid await and return a task directly with ToTask() (do remove async from your method signature).

return connectable.LastOrDefaultAsync().ToTask();

Once converted to a task, you can synchronously wait for it with Wait() (do not confuse Task.Wait() with Observable.Wait()).

connectable.LastOrDefaultAsync().ToTask().Wait();

However, most likely you do not want to wait at all! Waiting in a async context makes little sense. What you should do it put the remaining of the code that needs to run after the sequence completes in the OnComplete() part of the subscription. If you have (clean-up) code that needs to run even when you unsubscribe (Dispose), consider Observable.Using or the Finally(...) method to ensure this code is ran.



回答2:

As already mentioned the cause of the Observable.Range being repeated is the fact that you're subscribing twice - once with .Subscribe(...) and once with .Wait().

In this kind of circumstance I would go with a very simple blocking call to get the values. Just do this:

var results = query.ToArray().Wait();

The .ToArray() turns a multi-valued IObservable<T> into a single values IObservable<T[]>. The .Wait() turns this into T[]. It's the easy way to ensure only one subscription, blocking, and getting all of the values out.

In your case you may not need all values, but I think this is a good habit to get into.