I want to use something like a ThreadPoolExecutor to manage running a bunch of tasks on available threads. These tasks are all of the same type but deal with different accounts. New tasks for these accounts can be added at regular intervals and I want it to check and not allow the new tasks to start until the old tasks for the same account have already completed. What's the best way to do this?
EXAMPLE
Task for account "234" is started (via ThreadPoolExecutor.execute())
Task for account "238" is started (via ThreadPoolExecutor.execute())
New Task for account "234" created but not added to execute because first "234" task not complete (best way to check this?)
Task for account "238" completes
New Task for account "238" starts (via ThreadPoolExecutor.execute()) because none currently running for that account
What's the best way to do this? Simply have it check with a wait/sleep() for some check variable in the Runnable for "234"'s first task to finish? Or is there a better solution?
I have no doubt some one with more experience with this part of the API will have a better idea, but here are my thoughts on the subject...
Basically, I'd start with a "running" and "waiting" queue. The "running" queue keeps track of what's currently running, the "waiting" queue keeps track of the tasks that you holding back. These queue will need to be keyed to some kind of "group identifier" to make it easier to look up (ie Map<String, List<Runnable>
), for example, your account number
I'd look at overriding the execute
method. In here I'd compare the incoming task against the running queue to determine if any related tasks are currently running. If there is, I'd drop the new task into a wait queue.
I'd then override the beforeExecute
method. Here I would register the task in the "running" queue.
I'd override the 'afterExecute' method. Here I would remove the completed task from "running" queue, look up the queue of waiting tasks (via the group identifier of the completed tasks) and add the first task in the queue into the executor via the execute
method
Or you could do as Louis suggests :P
One simple possibility. Perhaps overly simple. Create 10 SingleThreadedExecutors. For each task
- "hash" the accountID by taking accountID mod 10 to find the
appropriate SingleThreadedExecutor. (in practice, accountID may not
be an int, e.g. if it's a String take it's hashCode() mod 10).
- Submit the task to that SingleThreadedExecutor.
This may not ideal, as processing of account 238 will be forced to wait until 358 is complete, but at least you are sure that a specific account, say, 234, will never be running at the same time. Depends on how much latency you can allow. Obviously, you could play with the number of Executors and the simplistic "hashing" algortihm I described.
I faced the same issue. My solution was to use a HashSet.
private static HashSet<Integer> runningTasks = new HashSet();
public void run(){
boolean isAlreadyRunning = false;
synchronized (runningTasks) {
if (runningTasks.contains(this.accountId)) {
isAlreadyRunning = true;
} else {
runningTasks.add(this.accountId);
}
}
if(isAlreadyRunning){
//schedule this task to run later here
//what I did was to reinsert this task to the task queue 5 seconds later
return;
}
//do your stuffs here
synchronized (runningTasks) {
runningTasks.remove(this.accountId);
}
}