I've got a very simple class that I am using to poll a directory for new files. It's got the location, a time to start monitoring that location, and an interval (in hours) for when to check again:
public class Thing
{
public string Name {get; set;}
public Uri Uri { get; set;}
public DateTimeOffset StartTime {get; set;}
public double Interval {get; set;}
}
I am new to Reactive Extensions, but I think it is exactly the right tool for the job here. At the start time, and on every subsequent interval, I simply want to call a web service that does all the heavy lifting - we'll use the ever inventive public bool DoWork(Uri uri)
to represent that.
edit: DoWork is a call to a web service that will check for new files and move them if necessary, so its execution should be async. It returns a true if it completed, false if not.
Things get complicated if I have a whole collection of these Thing
s. I can't wrap my head around how to create the Observable.Timer()
for each one, and have them all call the same method.
edit2: Observable.Timer(DateTimeOffset, Timespan) seems perfect to create an IObservable for what I'm trying to do here. Thoughts?
This is the clearest method I can think of:
The following one is more functional:
The First SelectMany creates a stream of things at their scheduled times. The second SelectMany takes this stream and creates a new one that repeats every interval. This needs to be joined to the
Observable.Return
which produces a value immediately asObservable.Interval
's first value is delayed.*Note, the first solution requires c#5 or else this will bite you.
Hmmm. Does
DoWork
produce a result that you need to do something with? I'll assume so. You didn't say, but I'll also assumeDoWork
is synchronous.Here's a version with a hypothetical
Task<bool> DoWorkAsync(Uri)
:This version assumes that
DoWorkAsync
will finish long before the interval expires and starts up a new instance and so does not guard against having concurrentDoWorkAsync
running for the sameThing
instance.Do you need there to be many timers? I assume if you have a collection of 20 things, then we will create 20 timers all to fire at the same point in time? On the same thread/scheduler?
Or perhaps you want to
DoWork
foreach thing at every period?i.e.
vs.
There are many ways that you can do work in the future.
So this now introduces another question. If you have your polling time as 60seconds and you do work function takes 5 seconds; should the next poll happen in 55 seconds or in 60 seconds? Here one answer indicates you want to use an Rx sequence, the other indicates that you probably want to use Periodic Scheudling.
Next question is, does DoWork return a value? Currently it looks like it does not*. In this case I think the most appropriate thing for you to do is to leverage the Periodic Schedulers (assuming Rx v2).
This now will schedule each thing to be processed periodically (without drift), on it's own interval and provide cancellation.
We can now upgrade this to a query if you like
The problem with these query is that we don't fulfil the
StartTime
requirement. Annoyingly theCcheduler.SchedulePeriodic
method does not provide an overload to also have a start offset.The
Observable.Timer
operator does however provide this. It will also internally leverage the non-drifting scheduling features. To reconstruct the query withObservable.Timer
we can do the following.So now you have a nice interface that should avoid drift. However, I think the work here is done in a serial manner (if many DoWork actions are called concurrently).
*ideally I would try to avoid side effect statements like this but I am not 100% sure what your requirements.
EDIT It appears that the calls to DoWork must be in parallel, so then you need to do a bit more. Ideally you make DoWork asnyc, but if you cant we can fake it till we make it.