I'm struggling to get my head around why the following test does not work:
[Fact]
public void repro()
{
var scheduler = new TestScheduler();
var count = 0;
// this observable is a simplification of the system under test
// I've just included it directly in the test for clarity
// in reality it is NOT accessible from the test code - it is
// an implementation detail of the system under test
// but by passing in a TestScheduler to the sut, the test code
// can theoretically control the execution of the pipeline
// but per this question, that doesn't work when using FromAsync
Observable
.Return(1)
.Select(i => Observable.FromAsync(Whatever))
.Concat()
.ObserveOn(scheduler)
.Subscribe(_ => Interlocked.Increment(ref count));
Assert.Equal(0, count);
// this call initiates the observable pipeline, but does not
// wait until the entire pipeline has been executed before
// returning control to the caller
// the question is: why? Rx knows I'm instigating an async task
// as part of the pipeline (that's the point of the FromAsync
// method), so why can't it still treat the pipeline atomically
// when I call Start() on the scheduler?
scheduler.Start();
// count is still zero at this point
Assert.Equal(1, count);
}
private async Task<Unit> Whatever()
{
await Task.Delay(100);
return Unit.Default;
}
What I'm trying to do is run some asynchronous code (represented above by Whatever()
) whenever an observable ticks. Importantly, I want those calls to be queued. More importantly, I want to be able to control the execution of the pipeline by using the TestScheduler
.
It seems like the call to scheduler.Start()
is instigating the execution of Whatever()
but it isn't waiting until it completes. If I change Whatever()
so that it is synchronous:
private async Task<Unit> Whatever()
{
//await Task.Delay(100);
return Unit.Default;
}
then the test passes, but of course that defeats the purpose of what I'm trying to achieve. I could imagine there being a StartAsync()
method on the TestScheduler
that I could await, but that does not exist.
Can anyone tell me whether there's a way for me to instigate the execution of the reactive pipeline and wait for its completion even when it contains asynchronous calls?
Noseratio's more elegant Rx way of writing this test. You can
await
observables to get their last value. Combine withCount()
and it becomes trivial.Note that the
TestScheduler
isn't serving any purpose in this example.As James mentions above, you cant mix concurrency models like you are. You remove the concurrency from Rx by using the
TestScheduler
, but never actually introduce concurrency via Rx. You do however introduce concurrency with the TPL (i.e.Task.Delay(100)
. Here will will actually run asynchronously on a task pool thread. So your synchronous tests will complete before the task has completed.You could change to something like this
Alternatively, you need to put the
Whatever
method behind an interface that you can mock out for testing. In which case you would just have your Stub/Mock/Double return the code from above i.e.return await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler).Select(_=>Unit.Default).ToTask();
Let me boil down your question to its essentials:
I should warn you up front, there is no quick and easy answer here, no convenient "trick" that can be deployed.
Asynchronous Calls and Schedulers
To answer this question I think we need to clarify some points. The term "asynchronous call" in the question above seems to be used specifically to refer to methods with a
Task
orTask<T>
signature - i.e. methods that use the Task Parallel Library (TPL) to run asynchronously.This is important to note because Reactive Extensions (Rx) takes a different approach to handling asynchronous operations.
In Rx the introduction of concurrency is managed via a scheduler, a type implementing the
IScheduler
interface. Any operation that introduces concurrency should make a available a scheduler parameter so that the caller can decide an appropriate scheduler. The core library slavishly adheres to this principle. So, for example,Delay
allows specification of a scheduler butWhere
does not.As you can see from the source,
IScheduler
provides a number ofSchedule
overloads. Operations requiring concurrency use these to schedule execution of work. Exactly how that work is executed is deferred completely to the scheduler. This is the power of the scheduler abstraction.Rx operations introducing concurrency generally provide overloads that allow the scheduler to be omitted, and in that case select a sensible default. This is important to note, because if you want your code to be testable via the use of
TestScheduler
you must use aTestScheduler
for all operations that introduce concurrency. A rogue method that doesn't allow this, could scupper your testing efforts.TPL Scheduling Abstraction
The TPL has it's own abstraction to handle concurrency: The
TaskScheduler
. The idea is very similar. You can read about it here..There are two very important differences between the two abstractions:
Now
property. TPL schedulers do not.TaskSchedulers
to a method introducing concurrency (returning aTask
orTask<T>
). The vast majority ofTask
returning methods assume use of the defaultTaskScheduler
and give you no choice about where work is run.Motivation for TestScheduler
The motivation to use a
TestScheduler
is generally two-fold:The way this works depends entirely on the fact that schedulers have their own notion of time. Every time an operation is scheduled via an
IScheduler
, we specify when it must execute - either as soon as possible, or at a specific time in the future. The scheduler then queues work for execution and will execute it when the specified time (according to the scheduler itself) is reached.When you call
Start
on theTestScheduler
, it works by emptying the queue of all operations with execution times at or before its current notion ofNow
- and then advancing its clock to the next scheduled work time and repeating until its queue is empty.This allows neat tricks like being able to test that an operation will never result in an event! If using real time this would be a challenging task, but with virtual time it's easy - once the scheduler queue is completely empty, then the
TestScheduler
concludes that no further events will ever happen - since if nothing is left on its queue, there is nothing there to schedule further tasks. In factStart
returns at this precisely this point. For this to work, clearly all concurrent operations to be measured must be scheduled on theTestScheduler
.A custom operator that carelessly makes its own choice of scheduler without allowing that choice to be overriden, or an operation that uses its own form of concurrency without a notion of time (such as TPL based calls) will make it difficult, if not impossible, to control execution via a
TestScheduler
.If you have an asynchronous operation run by other means, judicious use of the
AdvanceTo
andAdvanceBy
methods of theTestScheduler
can allow you to coordinate with that foreign source of concurrency - but the extent to which this is achievable depends on the control afforded by that foreign source.In the case of the TPL, you do know when a task is done - which does allow the use of waits and timeouts in tests, as ugly as these can be. Through the use of
TaskCompleteSources
(TCS) you can mock tasks and useAdvanceTo
to hit specific points and complete TCSs, but there is no one simple approach here. Often you just have to resort to ugly waits and timeouts because you don't have sufficient control over foreign concurrency.Rx is generally free-threaded and tries to avoid introducing concurrency wherever possible. Conversely, it's quite possible that different operations within an Rx call chain will need different types of scheduler abstraction. It's not always possible to simulate a call chain with a single test scheduler. Certainly, I have had cause to use multiple
TestSchedulers
to simulate some complex scenarios - e.g. chains that use theDispatcherScheduler
andTaskScheduler
sometimes need complex coordination that means you can't simply serialize their operations on to oneTestScheduler
.Some projects I have worked on have mandated the use of Rx for all concurrency specifically to avoid these problems. That is not always feasible, and even in these cases, some use of TPL is generally inevitable.
One particular pain point
One particular pain point of Rx that leaves many testers scratching their heads, is the fact that the TPL -> Rx family of conversions introduce concurrency. e.g.
ToObservable
,SelectMany
's overload acceptingTask<T>
etc. don't provide overloads with a scheduler and insidiously force you off theTestScheduler
thread, even if mocking with TCS. For all the pain this causes in testing alone, I consider this a bug. You can read all about this here - dig through and you'll find Dave Sexton's proposed fix, which provides an overload for specifying a scheduler, and is under consideration for inclusion. You may want to look into that pull request.A Potential Workaround
If you can edit your code to use it, the following helper method might be of use. It converts a task to an observable that will run on the TestScheduler and complete at the correct virtual time.
It schedules work on the TestScheduler that is responsible for collecting the task result - at the virtual time we state the task should complete. The work itself blocks until the task result is available - allowing the TPL task to run for however long it takes, or until a real amount of specified time has passed in which case a
TimeoutException
is thrown.The effect of blocking the work means that the
TestScheduler
won't advance its virtual time past the expected virtual completion time of the task until the task has actually completed. This way, the rest of the Rx chain can run in full-speed virtual time and we only wait on the TPL task, pausing the rest of the chain at the task completion virtual time whilst this happens.Crucially, other concurrent Rx operations scheduled to run in between the start virtual time of the Task based operation and the stated end virtual time of the Task are not blocked and their virtual completion time will be unaffected.
So set
duration
to the length of virtual time you want the task to appear to have taken. The result will then be collected at whatever the virtual time is when the task is started, plus the duration specified.Set
timeout
to the actual time you will allow the task to take. If it takes longer, a timeout exception is thrown:Usage in your code would be like this, and your assert will pass:
Conclusion
In summary, you haven't missed any convenient trick. You need to think about how Rx works, and how the TPL works and decide whether:
TestScheduler
altogetherTestScheduler
to bring some modicum of control over your tests.