I have the following function I wish to test
/// Items are processed asynchronously via fn as they arrive. However
/// if an item arrives before the last asynchronous operation has
/// completed then the cancellation token passed to fn will be
/// triggered enabling the task to be canceled in a best effort
/// way.
public static IObservable<U> SelectWithCancellation<T, U>
( this IObservable<T> This
, Func<CancellationToken, T, Task<U>> fn
)
{
return This
.Select(v=>Observable.FromAsync(token=>fn(token, v)))
.Switch();
}
I wish to test it and the best I have been able to come up with that works is below. First I create a long running task that can be canceled
public Task<string> JobTask
( CancellationToken token
, string input
)
{
return Task.Factory.StartNew(() =>
{
if ( input == "C" || input == "E" )
{
while ( !token.IsCancellationRequested ) ;
}
return input;
}
);
}
and then I test that it can really works
public class SelectWithCancelationSpec : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateHotObservable
( OnNext(100, "A")
, OnNext(200, "B")
, OnNext(300, "C")
, OnNext(400, "D")
, OnNext(500, "E")
, OnNext(500, "F")
);
List<string> actual = new List<string>();
o
.SelectWithCancellation(JobTask)
.Subscribe(v => actual.Add(v));
var delay = 100;
_Scheduler.AdvanceTo(150);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(250);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(350);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(450);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(550);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(650);
var expected = new[] { "A", "B", "D", "F" };
actual
.ShouldBeEquivalentTo(expected);
}
}
The problem is that I have had to introduce real time
into the
test. This is because my simulated JobTask is running on a real
thread off the thread pool and doesn't respect the virtual time
of the test scheduler. What happens is if I don't put the delays
in between the AdvanceTo
calls is I drop more messages than
I expect in the test because the JobTask takes too long to process.
The question is. How can I create a JobTask that respects the virtual time and allows me to test if I can successfully drop the intended messages.