可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.
I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.
Consider this scenario where the tasks with *
need to be processed in the order they were pushed in. The other tasks can be processed in any order.
push task1
push task2
push task3 *
push task4 *
push task5
push task6 *
....
and so on
In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.
I thought about having some of the threads operate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.
So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?
EDIT
As a more general problem, suppose the scenario above becomes
push task1
push task2 **
push task3 *
push task4 *
push task5
push task6 *
push task7 **
push task8 *
push task9
....
and so on
What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7
for example.
One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).
Thank you for your time.
回答1:
Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.
internal class TaskQueue
{
private readonly object _syncObj = new object();
private readonly Queue<QTask> _tasks = new Queue<QTask>();
private int _runningTaskCount;
public void Queue(bool isParallel, Action task)
{
lock (_syncObj)
{
_tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
}
ProcessTaskQueue();
}
public int Count
{
get{lock (_syncObj){return _tasks.Count;}}
}
private void ProcessTaskQueue()
{
lock (_syncObj)
{
if (_runningTaskCount != 0) return;
while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
{
QTask parallelTask = _tasks.Dequeue();
QueueUserWorkItem(parallelTask);
}
if (_tasks.Count > 0 && _runningTaskCount == 0)
{
QTask serialTask = _tasks.Dequeue();
QueueUserWorkItem(serialTask);
}
}
}
private void QueueUserWorkItem(QTask qTask)
{
Action completionTask = () =>
{
qTask.Task();
OnTaskCompleted();
};
_runningTaskCount++;
ThreadPool.QueueUserWorkItem(_ => completionTask());
}
private void OnTaskCompleted()
{
lock (_syncObj)
{
if (--_runningTaskCount == 0)
{
ProcessTaskQueue();
}
}
}
private class QTask
{
public Action Task { get; set; }
public bool IsParallel { get; set; }
}
}
Update
To handle task groups with serial and parallel task mixes, a GroupedTaskQueue
can manage a TaskQueue
for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.
internal class GroupedTaskQueue
{
private readonly object _syncObj = new object();
private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
private readonly string _defaultGroup = Guid.NewGuid().ToString();
public void Queue(bool isParallel, Action task)
{
Queue(_defaultGroup, isParallel, task);
}
public void Queue(string group, bool isParallel, Action task)
{
TaskQueue queue;
lock (_syncObj)
{
if (!_queues.TryGetValue(group, out queue))
{
queue = new TaskQueue();
_queues.Add(group, queue);
}
}
Action completionTask = () =>
{
task();
OnTaskCompleted(group, queue);
};
queue.Queue(isParallel, completionTask);
}
private void OnTaskCompleted(string group, TaskQueue queue)
{
lock (_syncObj)
{
if (queue.Count == 0)
{
_queues.Remove(group);
}
}
}
}
回答2:
Thread pools are good for cases where the relative order of the tasks doesn't matter, provided they all get done. In particular, it must be OK for them all to be done in parallel.
If your tasks must be done in a specific order, then they are not suitable for parallelism, so a thread pool is not appropriate.
If you want to move these serial tasks off the main thread, then a single background thread with a task queue would be appropriate for those tasks. You can continue to use a thread pool for the remaining tasks which are suitable for parallelism.
Yes, it means you have to decide where to submit the task depending on whether it is an in-order task or a "may be parallelized" task, but this is not a big deal.
If you have groups that must be serialized, but which can run in parallel with other tasks then you have multiple choices:
- Create a single task for each group, which does the relevant group tasks in order, and post this task to the thread pool.
- Have each task in a group explicitly wait for the previous task in the group, and post them to the thread pool. This requires that your thread pool can handle the case where a thread is waiting for a not-yet-scheduled task without deadlocking.
- Have a dedicated thread for each group, and post group tasks on the appropriate message queue.
回答3:
To do what you want to do with a threadpool, you might have to create some kind of scheduler.
Something like that:
TaskQueue -> Scheduler -> Queue -> ThreadPool
Scheduler runs in its own thread, keeping tracks of dependencies between jobs. When a job is ready to be done, the scheduler just pushes it in the queue for the threadpool.
The ThreadPool might have to send signals to the Scheduler to indicate when a job is done so the scheduler can put jobs depending on that job into the Queue.
In your case, the dependencies could probably be stored in a linked list.
Let's say you have the following dependencies:
3 -> 4 -> 6 -> 8
Job 3 is running on the threadpool, you still have no ideas that job 8 exists.
Job 3 ends. You remove the 3 from the linked list, you put job 4 on the queue to the threadpool.
Job 8 arrives. You put it at the end of the linked list.
The only constructs that have to be fully synchronized are the Queues before and after the scheduler.
回答4:
Basically, there are a number of pending tasks. Some of the tasks can only be performed when one or more other pending tasks have finished executing.
The pending tasks can be modeled in a dependency graph:
- "task 1 -> task2" means "task 2 can be executed only after task 1 is finished." the arrows point in the direction of execution order.
- the indegree of a task (the number of tasks pointing to it) determines whether the task is ready for execution. If the indegree is 0, it can be executed.
- sometimes a task must wait for multiple tasks to finish, the indegree is then >1.
- if a task doesn't have to wait for other tasks to finish anymore (its indegree is zero), it can be submitted to the thread pool with worker threads, or the queue with tasks waiting to be picked up by a worker thread. You know the submitted task will not cause deadlock, because the task isn't waiting for anything. As an optimization, you can use a priority queue, e.g. in which tasks that more tasks in the dependency graph depend on will be executed first. This also can't provoke deadlock, because all tasks in the thread pool can be executed. It can provoke starvation, however.
- If a task finishes execution, it can be removed from the dependency graph, possibly reducing the indegree of other tasks, which can in turn be submitted to the pool of working threads.
So there is (at least) one thread used to add/remove pending tasks, and there is a thread pool of working threads.
When a task is added to the dependency graph, you must check:
- how the task is connected in the dependency graph: what tasks must it wait for to finish and what tasks must wait for it to finish? Draw connections from and to the new task accordingly.
- once the connections are drawn: did the new connections cause any cycles in the dependency graph? If so, there is a deadlock situation.
Performance:
- this pattern is slower than sequential execution if parallel execution is in fact rarely possible, because you need extra administration to do everything almost sequentially anyway.
- this pattern is fast if many tasks can be performed simultaneously in practice.
Assumptions:
As you may have read between the lines, you must design the tasks so that they don't interfere with other tasks. Also, there must be a way to determine the priority of the tasks. The task priority should include the data handled by each task. Two tasks may not alter the same object simultaneously; one of the tasks should get priority over the other one instead, or the performed operations on the object must be thread-safe.
回答5:
If I understand the problem correctly, the jdk executors don't have this capability but it's easy to roll your own. You basically need
- a pool of worker threads, each of which has a dedicated queue
- some abstraction over those queues to which you offer work (c.f. the
ExecutorService
)
- some algorithm that deterministically selects a specific queue for each piece of work
- each piece of work then gets offers to the right queue and hence gets processed in the right order
The difference to the jdk executors is that they have 1 queue with n threads but you want n queues and m threads (where n may or may not equal m)
* edit after reading that each task has a key *
In a bit more detail
- write some code that transforms a key into an index (an int) in a given range (0-n where n is the number of threads you want), this could be as simple as
key.hashCode() % n
or it could be some static mapping of known key values to threads or whatever you want
- at startup
- create n queues, put them in an indexed structure (array, list whatever)
- start n threads, each thread just does a blocking take from the queue
- when it receives some work, it knows how to execute work specific to that task/event (you can obviously have some mapping of tasks to actions if you have heterogenous events)
- store this behind some facade that accepts the work items
- when a task arrives, hand it to the facade
- the facade finds the right queue for the task based on the key, offers it to that queue
it's easier enough to add auto restarting worker threads to this scheme, you just then need the worker thread to register with some manager to state "I own this queue" and then some housekeeping around that + detection of errors in the thread (which means it unregisters the ownership of that queue returning the queue to a free pool of queues which is a trigger to start a new thread up)
回答6:
I think thread pool can be effectively used in this situation. The idea is to use separate strand
object for each group of dependent tasks. You add tasks to your queue with or w/o strand
object. You use the same strand
object with dependent tasks. Your scheduler checks if the next task has a strand
and if this strand
is locked. If not - lock this strand
and run this task. If strand
is already locked - keep this task in queue until next scheduling event. When task is done unlock its strand
.
In result you need single queue, you don't need any additional threads, no complicated groups etc. strand
object can be very simple with two methods lock
and unlock
.
I often meet the same design problem, e.g. for an asynchronous network server that handles multiple simultaneous sessions. Sessions are independent (this maps them to your independent tasks and groups of dependent tasks) when tasks inside sessions are dependent (this maps session internal tasks to your dependent tasks inside a group). Using described approach I avoid explicit synchronization inside session completely. Every session has own strand
object.
And what is more, I use existing (great) implementation of this idea: Boost Asio library (C++). I just used their term strand
. Implementation is elegant: I wrap my async tasks into corresponding strand
object before scheduling them.
回答7:
Option 1 - The complex one
Since you have sequential jobs, you can gather up those jobs in a chain and let the jobs themselves resubmit to the thread pool once they are done. Suppose we have a list of jobs:
[Task1, ..., Task6]
like in your example. We have a sequential dependency, such that [Task3, Task4, Task6]
is a dependency chain. We now make a job (Erlang pseudo-code):
Task4Job = fun() ->
Task4(), % Exec the Task4 job
push_job(Task6Job)
end.
Task3Job = fun() ->
Task3(), % Execute the Task3 Job
push_job(Task4Job)
end.
push_job(Task3Job).
That is, we alter the Task3
job by wrapping it into a job which as a continuation pushes the next job in the queue to the thread pool. There are strong similarities to a general continuation passing style here also seen in systems like Node.js
or Pythons Twisted
framework.
Generalizing, you make a system where you can define job chains which can defer
further work and resubmit the further work.
Option 2 - The simple one
Why do we even bother splitting up the jobs? I mean, since they are sequentially dependent, executing all of them on the same Thread won't be faster or slower than taking that chain and spreading it out over multiple threads. Assuming "enough" work load, any thread will always have work to anyway, so just bundling the jobs together is probably easiest:
Task = fun() ->
Task3(),
Task4(),
Task6() % Just build a new job, executing them in the order desired
end,
push_job(Task).
It is rather easy to do stuff like this if you have functions as first-class citizens so you can build them in your language at whim, like you can in, say, Any functional programming language, Python, Ruby-blocks - and so on.
I don't particularly like the idea of building a queue, or a continuation stack, like in "Option 1" though and I would definitely go with the second option. In Erlang, we even have a programs called jobs
written by Erlang Solutions and released as Open Source. jobs
is built to execute and load regulate job executions like these. I'd probably combine option 2 with jobs if I were to solve this problem.
回答8:
The answers suggesting not use a thread-pool is like hard-coding the knowledge of task dependencies/execution order. Instead, I would create a CompositeTask
that manges the start/end dependency between two tasks. By encapsulating the dependency behind the task interface, all tasks can be treated uniformly, and added to the pool. This hides the execution details and allows the task dependencies to change without affecting whether or not you use a thread pool.
The question doesn't specify a language - I'll use Java, which I hope is readable for most.
class CompositeTask implements Task
{
Task firstTask;
Task secondTask;
public void run() {
firstTask.run();
secondTask.run();
}
}
This executes tasks sequentially and on the same thread. You can chain many CompositeTask
s together to create a sequence of as many sequential tasks as needed.
The downside here is that this ties up the thread for the duration of all tasks executing sequentially. You may have other tasks that you would prefer to execute inbetween the first and second tasks. So, rather than execute the second task directly, have the composite task schedule execution of the second task:
class CompositeTask implements Runnable
{
Task firstTask;
Task secondTask;
ExecutorService executor;
public void run() {
firstTask.run();
executor.submit(secondTask);
}
}
This ensures that the second task doesn't run until after the first task is complete and also allows the pool to execute other (possibly more urgent) tasks. Note that the first and second tasks may execute on separate threads, so although they do not execute concurrently, any shared data used by the tasks must be made visible to other threads (e.g. by making the variables volatile
.)
This is a simple, yet powerful and flexible approach, and allows the tasks themselves to define execution constraints, rather than doing it by using different thread pools.
回答9:
Use two Active Objects. In two words: active object pattern consists from priority queue and 1 or many working threads those can get tasks from queue and process its.
So use one active object with one working thread: all tasks those would be places to queue would be processed sequentially. Use second active object with number of working thread more then 1. In this case working threads would get and process tasks from queue in any order.
Luck.
回答10:
I think you're mixing concepts. Threadpool is ok when you want to distribute some work among threads but if you start mixing dependencies between threads then it isn't such a good idea.
My advice, simply don't use the threadpool for those tasks. Just create a dedicated thread and keep a simple queue of sequential items that must be processed by that thread alone. Then you can keep pushing tasks to the thread pool when you don't have a sequential requirement and use the dedicated thread when you have.
A clarification: Using common sense, a queue of serial tasks shall be executed by a single thread processing each task one after another :)
回答11:
This is achievable, well, as far as I understand your scenario. Basically what you need is do something smart to coordinate your tasks in main thread. Java API your need are ExecutorCompletionService and Callable
First, implement your callable task:
public interface MyAsyncTask extends Callable<MyAsyncTask> {
// tells if I am a normal or dependent task
private boolean isDependent;
public MyAsyncTask call() {
// do your job here.
return this;
}
}
Then in your main thread, use CompletionService coordinate the dependent task execution (i.e. a wait mechanism):
ExecutorCompletionService<MyAsyncTask> completionExecutor = new
ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
if (task.isNormal()) {
// if it is a normal task, submit it immediately.
completionExecutor.submit(task);
} else {
if (dependentFutureTask == null) {
// submit the first dependent task, get a reference
// of this dependent task for later use.
dependentFutureTask = completionExecutor.submit(task);
} else {
// wait for last one completed, before submit a new one.
dependentFutureTask.get();
dependentFutureTask = completionExecutor.submit(task);
}
}
}
By doing this, you use a single executor (threadpool size 5) execute both normal and dependent tasks, the normal task are executed immediately as soon as submitted, the dependent tasks are executed one by one (wait are performed in main thread by calling get() on Future before submitting new dependent task), so at any point of time, you always have a number of normal tasks and a single dependent task (if exists) running in a single threadpool.
This is just a head start, by using ExecutorCompletionService, FutureTask and Semaphore, you can implement more complex thread coordination scenario.
回答12:
How would you ensure those tasks are ordered?
push task1
push task2
push task346
push task5
In response to the edit:
push task1
push task27 **
push task3468 *
push task5
push task9
回答13:
You have two different kind of tasks. Mixing them up in a single queue feels rather odd. Instead of having one queue have two. For the sake of simplicity you could even use a ThreadPoolExecutor for both. For the serial tasks just give it a fixed size of 1, for the tasks that can be executed concurrently give it more. I don't see why that would be clumsy at all. Keep it simple and stupid. You have two different tasks so treat them accordingly.
回答14:
Since you only need to wait for a single task to complete before starting the dependent task, it can be easily done if you can schedule the dependent task in the first task. So in your second example:
at the end of task 2, schedule task 7
and
at the end of task 3, schedule task 4 and so on for 4->6 and 6->8.
In the beginning, just schedule tasks 1,2,5,9... and the rest should follow.
An even more general problem is when you have to wait for multiple tasks before a dependent task can start. Handling that efficiently is a non-trivial exercise.
回答15:
There is a java framework specifically for this purpose called dexecutor (disclaimer: I am the owner)
DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("task1", "task2");
executor.addDependency("task4", "task6");
executor.addDependency("task6", "task8");
executor.addIndependent("task3");
executor.addIndependent("task5");
executor.addIndependent("task7");
executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);
task1, task3, task5,task7 runs in parallel (Depending upon thread pool size), once task1 finishes, task2 runs, once task2 finishes task4 runs, once task4 finishes task6 runs and finally once task6 finishes task8 runs.
回答16:
There have been a lot of answers, and obviously one has been accepted. But why not use continuations?
If you have a known "serial" condition, then when you enqueue the first task with this condition, hold the Task; and for further tasks invoke Task.ContinueWith().
public class PoolsTasks
{
private readonly object syncLock = new object();
private Task serialTask = Task.CompletedTask;
private bool isSerialTask(Action task) {
// However you determine what is serial ...
return true;
}
public void RunMyTask(Action myTask) {
if (isSerialTask(myTask)) {
lock (syncLock)
serialTask = serialTask.ContinueWith(_ => myTask());
} else
Task.Run(myTask);
}
}
回答17:
Thread Pool with ordered and unordered execute methods:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OrderedExecutor {
private ExecutorService multiThreadExecutor;
// for single Thread Executor
private ThreadLocal<ExecutorService> threadLocal = new ThreadLocal<>();
public OrderedExecutor(int nThreads) {
this.multiThreadExecutor = Executors.newFixedThreadPool(nThreads);
}
public void executeUnordered(Runnable task) {
multiThreadExecutor.submit(task);
}
public void executeOrdered(Runnable task) {
multiThreadExecutor.submit(() -> {
ExecutorService singleThreadExecutor = threadLocal.get();
if (singleThreadExecutor == null) {
singleThreadExecutor = Executors.newSingleThreadExecutor();
threadLocal.set(singleThreadExecutor);
}
singleThreadExecutor.submit(task);
});
}
public void clearThreadLocal() {
threadLocal.remove();
}
}
After filling all queues the threadLocal should be cleared.
The only drawback is that singleThreadExecutor will be created each time the method
executeOrdered(Runnable task)
invoked in separate thread