可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I'm trying to create an Observable where each item is produced via an asynchronous task. The next item should be produced via an async call on the result of the previous item (co-recursion). In "Generate" parlance this would look something like this - except that Generate does not support async (nor does it support the delegate on the initial state.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
As a more concrete example, to peek all messages from a ServiceBus queue by fetching them 100 messages at a time, implement ProduceFirst, Continue and ProduceNext as follows:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
And then call .SelectMany(i => i)
on the IObservable<IEnumerable<BrokeredMessage>>
to turn it into a IObservable<BrokeredMessage>
Where _serviceBusReceiver is an instance of an interface as follows:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
And BrokeredMessage is from https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
回答1:
If you are going to roll your own async Generate
function I would recommend the use of recursive scheduling instead of wrapping a while loop.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library
回答2:
After doing a good bit of testing I think this does the job nicely using the built-in Rx operators.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(o =>
{
var current = default(TResult);
return
Observable
.FromAsync(initialState)
.Select(y => resultSelector(y))
.Do(c => current = c)
.Select(x =>
Observable
.While(
() => condition(current),
Observable
.FromAsync(() => iterate(current))
.Select(y => resultSelector(y))
.Do(c => current = c))
.StartWith(x))
.Switch()
.Where(x => condition(x))
.ObserveOn(scheduler ?? Scheduler.Default)
.Subscribe(o);
});
}
I've tested this code with the following:
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
return
Task.FromResult(
EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = 1
}));
}
Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
return Task.FromResult(
prev.Last().SequenceNumber < 3
? EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = prev.Last().SequenceNumber + 1
})
: Enumerable.Empty<BrokeredMessage>());
}
public class BrokeredMessage
{
public int SequenceNumber;
}
And running this sequence:
var ob = Generate(
async () => await ProduceFirst(),
prev => Continue(prev),
async prev => await ProduceNext(prev),
item => item);
I got this result:
My test code also used the Reactive Extension team's Interactive Extensions - NuGet "Ix-Main".
回答3:
I think this might be the correct answer:
This is not a good answer. Do not use.
I created by own Generate
that supports async/await on the initial state + iterate functions:
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector
)
{
return Observable.Create<TResult>(async obs =>
{
var state = await initialState();
while (condition(state))
{
var result = resultSelector(state);
obs.OnNext(result);
state = await iterate(state);
}
obs.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
});
}
Unfortunately this seems to have the side-effect that the production of messages racing far ahead of consumption. If the observer processes messages slowly then this will fetch millions of messages before we even process a handful of them. Not exactly what we want from a service bus.
I'm going to work through the above, maybe read some more, and will post a more specific question if needed.
回答4:
Having a similar question myself and also agreeing with the following comment:
I might be violating the spirit of the reactive paradigm but this is what I need at the moment - it should not continue pulling messages from a queue until they can be processed (at least in the near future).
I believe that IAsyncEnumerable
from Ix.NET is a better fit than IObservable
for this scenario - both for the question here and any similar async unfolding function. The reason is that each time we iterate and then extract a result from a Task
, the flow control is with us, the caller, to pull the next item or choose not to if a certain condition is met. This is like IAsyncEnumerable
and not like IObservable
, which pushes items to us without us having control over the rate.
Ix.NET doesn't have a suitable version of AsyncEnumerable.Generate
so I wrote the following to solve this problem.
public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var started = false;
var current = default(TState);
return AsyncEnumerable.CreateEnumerator(async c =>
{
if (!started)
{
started = true;
var conditionMet = !c.IsCancellationRequested && condition(initialState);
if (conditionMet) current = initialState;
return conditionMet;
}
{
var newVal = await iterate(current).ConfigureAwait(false);
var conditionMet = !c.IsCancellationRequested && condition(newVal);
if (conditionMet) current = newVal;
return conditionMet;
}
},
() => current,
() => { });
});
}
Notes:
- Only very lightly tested.
- Does return the initial state.
- Does not return the first TState that fails the condition, even though it has
done the work to get that result. Possibly a different version could
include that.
- I would prefer to get rid of the
condition
parameter as, since it's a pull system, it's entirely up to the caller whether to call MoveNext or not and so condition
seems redundant. It essentially adds a call to TakeWhile
onto the result of the
function. However I haven't looked deep enough into Ix.NET to
know whether a false
response from MoveNext
is required in order
to dispose
the IAsyncEnumerator
, so for that reason I've included it.
IAsyncEnumerable
can of course be converted to IObservable
if that specific type is required.