I'm currently working on a project, where we have the challenge to process items in parallel. So far not a big deal ;) Now to the problem. We have a list of IDs, where we periodically (every 2 sec's) what to call a StoredProcedure for each ID. The 2 sec's need to be checked for each item individually, as they are added and removing during runtime. In addition we want to configure the maximum degree of parallelism, as the DB should not be flooded with 300 threads concurrently. An item which is being processed should not be rescheduled for processing until it has finished with the previous execution. Reason is that we want to prevent queueing up a lot of items, in case of delays on the DB.
Right now we are using a self-developed component, that has a main thread, which periodically checks what items need to scheduled for processing. Once it has the list, it's dropping those on a custom IOCP-based thread pool, and then uses waithandles to wait for the items being processed. Then the next iteration starts. IOCP because of the work-stealing it provides.
I would like to replace this custom implementation with a TPL/.NET 4 version, and I would like to know how you would solve it (ideally simple and nicely readable/maintainable). I know about this article: http://msdn.microsoft.com/en-us/library/ee789351.aspx, but it's just limiting the amount of threads being used. Leaves work stealing, periodically executing the items ....
Ideally it will become a generic component, that can be used for some all the tasks that need to be done periodically for a list of items.
any input welcome, tia Martin
I don't think you actually need to get down and dirty with direct TPL
Tasks
for this. For starters I would set up aBlockingCollection
around aConcurrentQueue
(the default) with noBoundedCapacity
set on theBlockingCollection
to store the IDs that need to be processed.From there I would just use
Parallel::ForEach
on the enumeration returned from theBlockingCollection::GetConsumingEnumerable
. In theForEach
call you will setup yourParallelOptions::MaxDegreeOfParallelism
Inside the body of theForEach
you will execute your stored procedure.Now, once the stored procedure execution completes, you're saying you don't want to re-schedule the execution for at least two seconds. No problem, schedule a
System.Threading.Timer
with a callback which will simply add the ID back to theBlockingCollection
in the supplied callback.Finally, when the process is shutting down you would call
BlockingCollection::CompleteAdding
so that the enumerable being processed with stop blocking and complete and the Parallel::ForEach will exit. If this were a Windows service for example you would do this inOnStop
.Update
You raised a valid concern in your comment that you might be processing a large amount of IDs at any given point and fear that there would be too much overhead in a timer per ID. I would absolutely agree with that. So in the case that you are dealing with a large list of IDs concurrently, I would change from using a timer-per-ID to using another queue to hold the "sleeping" IDs which is monitored by a single short interval timer instead. First you'll need a
ConcurrentQueue
onto which to place the IDs that are asleep:Now, I'm using a two-part
Tuple
here for illustration purposes, but you may want to create a more strongly typed struct for it (or at least alias it with ausing
statement) for better readability. The tuple has the id and a DateTime which represents when it was put on the queue.Now you'll also want to setup the timer that will monitor this queue:
Then you would simply change the
Parallel::ForEach
to do the following instead of setting up a timer for each one:This is pretty similar to the approach you said you already had in your question, but does so with TPL tasks. A task just adds itself back to a list of things to schedule when its done.
The use of locking on a plain list is fairly ugly in this example, would probably want a better collection to hold the list of things to schedule
That starts up the SchedulingLoop, which actually performs the checking if its been two seconds since something ran