Best way to wait on tasks to complete before addin

2019-06-10 02:31发布

问题:

I want to use something like a ThreadPoolExecutor to manage running a bunch of tasks on available threads. These tasks are all of the same type but deal with different accounts. New tasks for these accounts can be added at regular intervals and I want it to check and not allow the new tasks to start until the old tasks for the same account have already completed. What's the best way to do this?

EXAMPLE

  1. Task for account "234" is started (via ThreadPoolExecutor.execute())

  2. Task for account "238" is started (via ThreadPoolExecutor.execute())

  3. New Task for account "234" created but not added to execute because first "234" task not complete (best way to check this?)

  4. Task for account "238" completes

  5. New Task for account "238" starts (via ThreadPoolExecutor.execute()) because none currently running for that account

What's the best way to do this? Simply have it check with a wait/sleep() for some check variable in the Runnable for "234"'s first task to finish? Or is there a better solution?

回答1:

I have no doubt some one with more experience with this part of the API will have a better idea, but here are my thoughts on the subject...

Basically, I'd start with a "running" and "waiting" queue. The "running" queue keeps track of what's currently running, the "waiting" queue keeps track of the tasks that you holding back. These queue will need to be keyed to some kind of "group identifier" to make it easier to look up (ie Map<String, List<Runnable>), for example, your account number

I'd look at overriding the execute method. In here I'd compare the incoming task against the running queue to determine if any related tasks are currently running. If there is, I'd drop the new task into a wait queue.

I'd then override the beforeExecute method. Here I would register the task in the "running" queue.

I'd override the 'afterExecute' method. Here I would remove the completed task from "running" queue, look up the queue of waiting tasks (via the group identifier of the completed tasks) and add the first task in the queue into the executor via the execute method

Or you could do as Louis suggests :P



回答2:

One simple possibility. Perhaps overly simple. Create 10 SingleThreadedExecutors. For each task

  1. "hash" the accountID by taking accountID mod 10 to find the appropriate SingleThreadedExecutor. (in practice, accountID may not be an int, e.g. if it's a String take it's hashCode() mod 10).
  2. Submit the task to that SingleThreadedExecutor.

This may not ideal, as processing of account 238 will be forced to wait until 358 is complete, but at least you are sure that a specific account, say, 234, will never be running at the same time. Depends on how much latency you can allow. Obviously, you could play with the number of Executors and the simplistic "hashing" algortihm I described.



回答3:

I faced the same issue. My solution was to use a HashSet.

private static HashSet<Integer> runningTasks = new HashSet();

public void run(){
  boolean isAlreadyRunning = false;
  synchronized (runningTasks) {
    if (runningTasks.contains(this.accountId)) {
      isAlreadyRunning = true;
    } else {
      runningTasks.add(this.accountId);
    }
  }
  if(isAlreadyRunning){
    //schedule this task to run later here
    //what I did was to reinsert this task to the task queue 5 seconds later
    return;
  }
  //do your stuffs here

  synchronized (runningTasks) {
    runningTasks.remove(this.accountId);
  }
}