New to Rx -- I have a sequence that appears to be functioning correctly except for the fact that it appears to repeat.
I think I'm missing something around calls to Select()
or SelectMany()
that triggers the range to re-evaluate.
Explanation of Code & What I'm trying to Do
- For all numbers, loop through a method that retrieves data (paged from a database).
- Eventually, this data will be empty (I only want to keep processing while it retrieves data
- For each of those records retrieved, I only want to process ones that should be processed
- Of those that should be processed, I'd like to process up to
x
of them in parallel (according to a setting). - I want to wait until the entire sequence is completed to exit the method (hence the wait call at the end).
Problem With the Code Below
- I run the code through with a data set that I know only has 1 item.
- So, page 0 returns 1 item, and page 1 return 0 items.
- My expectation is that the process runs once for the one item.
- However, I see that both page 0 and 1 are called twice and the process thus runs twice.
I think this has something to do with a call that is causing the range to re-evaluate beginning from 0, but I can't figure out what that it is.
The Code
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();