I realized my ThreadPoolExecutor with PriorityBlockingQueue like in this example:
https://stackoverflow.com/a/12722648/2206775
and wrote a test:
PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(16);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 3);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 2);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("5");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 5);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Thread.sleep(1000);
System.out.println("4");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 4);
executorService.shutdown();
try {
executorService.awaitTermination(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
But in the end, I don't get 1 2 3 4 5, I get a random order of those numbers. Is there a problem with the test, or something else? And if first, how can it be tested correctly?
The priority is only taken into account if the pool is fully busy and you submit several new tasks. If you define your pool with only one thread, you should get the expected output. In your example, all tasks get executed concurrently and which one finishes first is somewhat random.
By the way the linked implementation has a problem and throws an exception if your queue is full and you submit new tasks.
See below a working example of what you are trying to achieve (I have overriden newTaskFor
in a simplistic way, just to make it work - you might want to improve that part).
It prints: 1 2 3 4 5
.
public class Test {
public static void main(String[] args) {
PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1);
executorService.submit(getRunnable("1"), 1);
executorService.submit(getRunnable("3"), 3);
executorService.submit(getRunnable("2"), 2);
executorService.submit(getRunnable("5"), 5);
executorService.submit(getRunnable("4"), 4);
executorService.shutdown();
try {
executorService.awaitTermination(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Runnable getRunnable(final String id) {
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(id);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
static class PriorityExecutor extends ThreadPoolExecutor {
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return (RunnableFuture<T>) callable;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return (RunnableFuture<T>) runnable;
}
}
static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> {
volatile int priority = 0;
public ComparableFutureTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
@Override
public int compareTo(ComparableFutureTask<T> o) {
return Integer.valueOf(priority).compareTo(o.priority);
}
}
}
You have 16 threads and only 5 tasks, meaning all of them are being executed concurrently and the priority is actually irrelevant.
The priority only matters when there are tasks waiting to be executed.
To show this, if you set your example to only use 1 thread, you will get your expected output.