New to Rx -- I have a sequence that appears to be functioning correctly except for the fact that it appears to repeat.
I think I'm missing something around calls to Select()
or SelectMany()
that triggers the range to re-evaluate.
Explanation of Code & What I'm trying to Do
- For all numbers, loop through a method that retrieves data (paged from a database).
- Eventually, this data will be empty (I only want to keep processing while it retrieves data
- For each of those records retrieved, I only want to process ones that should be processed
- Of those that should be processed, I'd like to process up to
x
of them in parallel (according to a setting). - I want to wait until the entire sequence is completed to exit the method (hence the wait call at the end).
Problem With the Code Below
- I run the code through with a data set that I know only has 1 item.
- So, page 0 returns 1 item, and page 1 return 0 items.
- My expectation is that the process runs once for the one item.
- However, I see that both page 0 and 1 are called twice and the process thus runs twice.
I think this has something to do with a call that is causing the range to re-evaluate beginning from 0, but I can't figure out what that it is.
The Code
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();
As already mentioned the cause of the
Observable.Range
being repeated is the fact that you're subscribing twice - once with.Subscribe(...)
and once with.Wait()
.In this kind of circumstance I would go with a very simple blocking call to get the values. Just do this:
The
.ToArray()
turns a multi-valuedIObservable<T>
into a single valuesIObservable<T[]>
. The.Wait()
turns this intoT[]
. It's the easy way to ensure only one subscription, blocking, and getting all of the values out.In your case you may not need all values, but I think this is a good habit to get into.
This is because you subscribe to the sequence twice. Once at
query.Subscribe(...)
and again atquery.Wait()
.Observable.Range(0, int.MaxValue)
is a cold observable. Every time you subscribe to it, it will be evaluated again. You could make the observable hot by publishing it withPublish()
, then subscribe to it, and thenConnect()
and thenWait()
. This does add a risk to get aInvalidOperationException
if you callWait()
after the last element is already yielded. A better alternative isLastOrDefaultAsync()
.That would get you something like this:
Or you can avoid await and return a task directly with
ToTask()
(do remove async from your method signature).Once converted to a task, you can synchronously wait for it with
Wait()
(do not confuseTask.Wait()
withObservable.Wait()
).However, most likely you do not want to wait at all! Waiting in a async context makes little sense. What you should do it put the remaining of the code that needs to run after the sequence completes in the
OnComplete()
part of the subscription. If you have (clean-up) code that needs to run even when you unsubscribe (Dispose), considerObservable.Using
or theFinally(...)
method to ensure this code is ran.