implementing PriorityQueue on ThreadPoolExecutor

2019-07-14 06:08发布

Been struggling with this for over 2 days now.

implemented the answer I saw on here Specify task order execution in Java

public class PriorityExecutor extends ThreadPoolExecutor {

public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new PriorityExecutor(nThreads, nThreads, 0L,
            TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
    return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
    super.execute(new ComparableFutureTask(command, null, priority));
}
}

public class ComparableFutureTask<T> extends FutureTask<T>
    implements
    Comparable<ComparableFutureTask<T>> {

volatile int priority = 0;

public ComparableFutureTask(Runnable runnable, T result, int priority) {
    super(runnable, result);
    this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
    super(callable);
    this.priority = priority;
}

@Override
public int compareTo(ComparableFutureTask<T> o) {
    return Integer.valueOf(priority).compareTo(o.priority);
}
}

The Runnable I use: MyTask

public class MyTask implements Runnable{

 public MyTask(File file, Context context, int requestId) {
    this._file = file;
    this.context = context;
    this.requestId = requestId;
}

@Override
public void run() {
      // some work
    } catch (IOException e) {
        Log.e("Callable try", post.toString());

    }
}

My service: MediaDownloadService

public class MediaDownloadService extends Service {

private DBHelper helper;
Notification notification;
HashMap<Integer,Future> futureTasks = new HashMap<Integer, Future>();
final int _notificationId=1;
File file;

@Override
public IBinder onBind(Intent intent) {
    return sharonsBinder;
}


@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    helper = new DBHelper(getApplicationContext());
    PriorityExecutor executor = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(3);
    Log.e("requestsExists", helper.requestsExists() + "");
   if(helper.requestsExists()){
        // map of the index of the request and the string of the absolute path of the request
        Map<Integer,String> requestMap = helper.getRequestsToExcute(0);
        Set<Integer> keySet = requestMap.keySet();
        Iterator<Integer> iterator = keySet.iterator();
        Log.e("MAP",requestMap.toString());
        //checks if the DB requests exists
        if(!requestMap.isEmpty()){
            //execute them and delete the DB entry
            while(iterator.hasNext()){
                int iteratorNext = iterator.next();
                Log.e("ITREATOR", iteratorNext + "");
                file = new File(requestMap.get(iteratorNext));
                Log.e("file", file.toString());
                Log.e("thread Opened", "Thread" + iteratorNext);
                Future future = executor.submit(new MyTask(file, this, iteratorNext),10);
                futureTasks.put(iteratorNext, future);
                helper.requestTaken(iteratorNext);
            }
            Log.e("The priority queue",executor.getQueue().toString());
        }else{

            Log.e("stopself", "stop self after this");
            this.stopSelf();
        }
    }
    return START_STICKY;
}

keep getting an error on this line : Future future = executor.submit(new MyTask(file, this, iteratorNext),10);

Even tho an executor.submit(); suppose to return a future object i keep getting

Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
        at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:318)
        at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:450)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1331)
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:81)
        at com.vit.infibond.test.PriorityExecutor.submit(PriorityExecutor.java:26)
        at com.vit.infibond.test.MediaDownloadService.onStartCommand(MediaDownloadService.java:65)

Can anyone save me from this nightmare?

I tried doing as this answer suggest as well Testing PriorityBlockingQueue in ThreadPoolExecutor

by adding the forNewTask override only to get casting execption again but this time for RunnableFuture.

I understand something basic is missing on my understanding and would appreciate a depth explanation...

2条回答
做个烂人
2楼-- · 2019-07-14 06:48

By looking at the source code for java.util.concurrent.ThreadPoolExecutor it seems to be a real pain to get this working when submitting futures. You have to override protected methods that feels internal and do some nasty casts.

I suggest you simply use the execute method instead. There is no wrapping of the Runnable going on there so it should work.

If you need to wait for the results of your jobs I suggest implementing that on your own to avoid having to mess around with the ThreadPoolExecutor internals.

查看更多
一纸荒年 Trace。
3楼-- · 2019-07-14 06:52

What sharon gur has suggest at the very bottom is to change

//execute with New comparable task
public void execute(Runnable command, int priority) {
    super.execute(new ComparableFutureTask(command, null, priority));
}

to

//execute with New comparable task
public ComparableFutureTask  execute(Runnable command, int priority) {
    ComparableFutureTask task = new ComparableFutureTask(command, null, priority);
    super.execute(task);
    return task;
}

Then in your caller:

CurrentTask currentTask = new CurrentTask(priority,queue)
RunnableFuture task = enhancedExecutor.execute(currentTask,priority.value)
task?.get()

I had a problem where

RunnableFuture task = myExecutor.submit(currentTask)
task?.get()

caused an issue where currentTask was then converted to FutureTask and had no understanding of my objects within CurrentTask. As .execute alone everything was fine. This hack appears to semi / near enough work.

so as it worked perfectly but no file was produced

RunnableFuture task = myExecutor.execuute(currentTask)
    task?.get()

So this is how I made it work (priority is being handled twice) doesn't feel right but works...

CurrentTask::

class CurrentTask implements Runnable {
    private Priority priority
    private MyQueue queue

    public int getPriority() {
        return priority.value
    }

    public CurrentTask(Priority priority,ReportsQueue queue){
        this.priority = priority
        this.queue=queue
    }

    @Override
    public void run() {
...
}
}

Priority:

public enum Priority {

    HIGHEST(0),
    HIGH(1),
    MEDIUM(2),
    LOW(3),
    LOWEST(4)

    int value

    Priority(int val) {
        this.value = val
    }

    public int getValue(){
        return value
    }
}

Then your executor call

public YourExecutor() {

    public YourExecutor() {
        super(maxPoolSize,maxPoolSize,timeout,TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(1000,new ReverseComparator()))
    }

So before changing to new method submit hit the comparator below and being TaskExecutor would not understand .priority?.value which as default .execute currentTask was what hit this and it all worked

public int compare(final Runnable lhs, final Runnable rhs) {

    if(lhs instanceof Runnable && rhs instanceof Runnable){
      // Favour a higher priority
        println "${lhs} vs ${lhs.getClass()}"
      if(((Runnable)lhs)?.priority?.value<((Runnable)rhs)?.priority?.value){
 ...
}

}

so with above hack and below changes it appears to be working

class  ReverseComparator implements Comparator<ComparableFutureTask>{

  @Override
  public int compare(final ComparableFutureTask lhs, final ComparableFutureTask rhs) {

    if(lhs instanceof ComparableFutureTask && rhs instanceof ComparableFutureTask){

        // run higher priority (lower numbers before higher numbers)
        println "${lhs} vs ${lhs.getClass()} ::: ${lhs.priority}"
      if(((Runnable)lhs)?.priority<((Runnable)rhs)?.priority){
          println "-returning -1"
        return -1;
      } else if (((Runnable)lhs)?.priority>((Runnable)rhs)?.priority){
      println "-returning @@@1"
        return 1;
      } 


    }
    println "-returning ==0 "
    return 0;
  }  

Simply because we have passed in override ComparableFutureTask that has a priority which extends FutureTask

Hope it makes sense been going round for a day and a bit now :)

查看更多
登录 后发表回答