I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
And then call .SelectMany(i => i)
on the IObservable<IEnumerable<BrokeredMessage>>
to turn it into a IObservable<BrokeredMessage>
Where _serviceBusReceiver is an instance of an interface as follows:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
If you are going to roll your own async
Generate
function I would recommend the use of recursive scheduling instead of wrapping a while loop.This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library
After doing a good bit of testing I think this does the job nicely using the built-in Rx operators.
I've tested this code with the following:
And running this sequence:
I got this result:
My test code also used the Reactive Extension team's Interactive Extensions - NuGet "Ix-Main".
Having a similar question myself and also agreeing with the following comment:
I believe that
IAsyncEnumerable
from Ix.NET is a better fit thanIObservable
for this scenario - both for the question here and any similar async unfolding function. The reason is that each time we iterate and then extract a result from aTask
, the flow control is with us, the caller, to pull the next item or choose not to if a certain condition is met. This is likeIAsyncEnumerable
and not likeIObservable
, which pushes items to us without us having control over the rate.Ix.NET doesn't have a suitable version of
AsyncEnumerable.Generate
so I wrote the following to solve this problem.Notes:
condition
parameter as, since it's a pull system, it's entirely up to the caller whether to call MoveNext or not and socondition
seems redundant. It essentially adds a call toTakeWhile
onto the result of the function. However I haven't looked deep enough into Ix.NET to know whether afalse
response fromMoveNext
is required in order todispose
theIAsyncEnumerator
, so for that reason I've included it.IAsyncEnumerable
can of course be converted toIObservable
if that specific type is required.I think this might be the correct answer:This is not a good answer. Do not use.
I created by own
Generate
that supports async/await on the initial state + iterate functions:Unfortunately this seems to have the side-effect that the production of messages racing far ahead of consumption. If the observer processes messages slowly then this will fetch millions of messages before we even process a handful of them. Not exactly what we want from a service bus.
I'm going to work through the above, maybe read some more, and will post a more specific question if needed.