How to create LIFO executor?

2019-02-03 08:59发布

问题:

I would like to create a thread pool which will execute the most recently submitted task. Any advice on how to accomplish this?

Thank you

回答1:

You could probably just implement your own BlockingQueue wrapper that maps offer/poll to a stack. Then use this as the BlockingQueue implementation you pass to a ThreadPoolExecutor. My suggestion would be to wrap one of the existing Deque implementations such as ArrayDeque.

  • This is not synchronized, so you'll need to wrap each of the BlockingQueue methods with a synchronizer (if not something more exotic).
  • You'll also need to introduce wait/notify conditions for the blocking operations.
  • Finally, you'll need to map one set of the BlockingQueue polarities (either the "put" or the "take" side) to the same end of the dequeue as the other (to treat it like a stack).

Note that there is some work (see Herlihy's book on The Art of Multiprocessor Programming) on faster concurrent stacks, but I don't think there are any implementations in the JDK and I'm not sure if Herlihy's implementations offer blocking flavors.

An Implementation atop Deque

I checked the Android documentation, which suggests that Deque is around for you, so here's an implementation. It's a fairly easy step to do a wrapper around a stack, too, but Deque is preferred.

import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;


public final class BlockingLifoQueue<T> implements BlockingQueue<T>
{
  // we add and remove only from the end of the queue
  private final BlockingDeque<T> deque; 

  public BlockingLifoQueue()
  { deque = new LinkedBlockingDeque<T>(); }

  public boolean add(T e) {
    deque.addLast(e);
    return true;
  }

  public boolean contains(Object o)
  { return deque.contains(o); }

  public int drainTo(Collection<? super T> c)
  { return deque.drainTo(c); }

  public int drainTo(Collection<? super T> c, int maxElements)
  { return deque.drainTo(c,maxElements); }

  public boolean offer(T e)
  { return deque.offerLast(e); }

  public boolean offer(T e, long timeout, TimeUnit unit)
      throws InterruptedException
  { return deque.offerLast(e,timeout,unit); }

  public T poll(long timeout, TimeUnit unit) throws InterruptedException
  { return deque.pollLast(timeout, unit); }

  public void put(T e) throws InterruptedException
  { deque.putLast(e); }

  public int remainingCapacity()
  { return deque.size(); }

  public boolean remove(Object o)
  { return deque.remove(o); }

  public T take() throws InterruptedException
  { return deque.takeLast(); }

  public T element()
  {
    if (deque.isEmpty()) { 
      throw new NoSuchElementException("empty stack");
    }

    return deque.pollLast();
  }

  public T peek()
  { return deque.peekLast(); }

  public T poll()
  { return deque.pollLast(); } // deque.peekLast(); } -- fixed typo.

  public T remove()
  {
    if (deque.isEmpty()) { 
      throw new NoSuchElementException("empty stack");
    }

    return deque.pollLast();
  }

  public boolean addAll(Collection<? extends T> c)
  { 
    for (T e : c) { deque.add(e); }
    return true;
  }

  public void clear()
  { deque.clear();}

  public boolean containsAll(Collection<?> c)
  { return deque.containsAll(c); }

  public boolean isEmpty()
  {  return deque.isEmpty(); }

  public Iterator<T> iterator()
  { return deque.descendingIterator(); }

  public boolean removeAll(Collection<?> c)
  { return deque.removeAll(c); }

  public boolean retainAll(Collection<?> c)
  { return deque.retainAll(c); }

  public int size()
  { return deque.size(); }

  public Object[] toArray()
  { return deque.toArray(); }

  public <T> T[] toArray(T[] a)
  { return deque.toArray(a); }
}


回答2:

Similar to what andersoj suggested, however there is the availability of a BlockingDeque.

You can extend the LinkedBlockingDeque class to always pop and push first when offer and removing.

public class FIFOBlockingDeque<T> extends LinkedBlockingDeque<T> {

 @Override
 public boolean offer(T t) {
  return super.offerFirst(t);
 }

 @Override
 public T remove() {
  return super.removeFirst();
 }
}

Then pass that in as a parameter to the ThreadPoolExecutor (BlockingDeque extends BlockingQueue)

Edit: To answer your comment question you can instead of inheriting from a Deque, you can use the java.util.Stack supplied. It is considered legacy, if you are confined to the Java library itself this would be best.

You would instead of offerFirst and removeFirst you can use push and pop. Of course you would have to implement BlockingQueue and complete that implementation.



回答3:

This would easily be accomplished by using a PriorityQueue or PriorityBlockingQueue in which the most recently queued items get highest priority.



回答4:

ThreadPoolExecutor constructors accept a BlockingQueue that is used to keep the tasks to execute. You will probably need to implement a customized version of BlockingQueue to support a LIFO policy.



回答5:

I think that there is a mistake in the @andersoj implementation of the LIFO, in a method of BlockingLifoQueue.

The remove method should be like this:

  public T remove()
  {
    if (deque.isEmpty()) { 
      throw new NoSuchElementException("empty stack");
    }

    return deque.pollFirst(); // instead of pollLast()
  }

Sorry if I am wrong but It doesn't make sense for me ... to poll the Last in a LIFO.