Concurrent Set Queue

2019-01-14 15:05发布

Maybe this is a silly question, but I cannot seem to find an obvious answer.

I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?

7条回答
霸刀☆藐视天下
2楼-- · 2019-01-14 15:25

I would use a synchronized LinkedHashSet until there was enough justification to consider alternatives. The primary benefit that a more concurrent solution could offer is lock splitting.

The simplest concurrent approach would be a a ConcurrentHashMap (acting as a set) and a ConcurrentLinkedQueue. The ordering of operations would provide the desired constraint. An offer() would first perform a CHM#putIfAbsent() and if successful insert into the CLQ. A poll() would take from the CLQ and then remove it from the CHM. This means that we consider an entry in our queue if it is in the map and the CLQ provides the ordering. The performance could then be adjusted by increasing the map's concurrencyLevel. If you are tolerant to additional racy-ness, then a cheap CHM#get() could act as a reasonable precondition (but it can suffer by being a slightly stale view).

查看更多
何必那么认真
3楼-- · 2019-01-14 15:29

If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.

Note: this code uses Guava in a few places.

查看更多
萌系小妹纸
4楼-- · 2019-01-14 15:35

What do you mean by a concurrent queue with Set semantics? If you mean a truly concurrent structure (as opposed to a thread-safe structure) then I would contend that you are asking for a pony.

What happens for instance if you call put(element) and detect that something is already there which immediately is removed? For instance, what does it mean in your case if offer(element) || queue.contains(element) returns false?

These kinds of things often need to thought about slightly differently in a concurrent world as often nothing is as it seems unless you stop the world (lock it down). Otherwise you are usually looking at something in the past. So, what are you actually trying to do?

查看更多
一夜七次
5楼-- · 2019-01-14 15:36

A java.util.concurrent.ConcurrentLinkedQueue gets you most of the way there.

Wrap the ConcurrentLinkedQueue with your own class that checks for the uniqueness of an add. Your code has to be thread safe.

查看更多
该账号已被封号
6楼-- · 2019-01-14 15:38

A simple answer for a queue of unique objects can be as follow:

import java.util.concurrent.ConcurrentLinkedQueue;

public class FinalQueue {

    class Bin {
        private int a;
        private int b;

        public Bin(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public int hashCode() {
            return a * b;
        }

        public String toString() {
            return a + ":" + b;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Bin other = (Bin) obj;
            if ((a != other.a) || (b != other.b))
                return false;
            return true;
        }
    }

    private ConcurrentLinkedQueue<Bin> queue;

    public FinalQueue() {
        queue = new ConcurrentLinkedQueue<Bin>();
    }

    public synchronized void enqueue(Bin ipAddress) {
        if (!queue.contains(ipAddress))
            queue.add(ipAddress);
    }

    public Bin dequeue() {
        return queue.poll();
    }

    public String toString() {
        return "" + queue;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        FinalQueue queue = new FinalQueue();
        Bin a = queue.new Bin(2,6);

        queue.enqueue(a);
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(13, 3));
        queue.enqueue(queue.new Bin(14, 3));
        queue.enqueue(queue.new Bin(13, 9));
        queue.enqueue(queue.new Bin(18, 3));
        queue.enqueue(queue.new Bin(14, 7));
        Bin x= queue.dequeue();
        System.out.println(x.a);
        System.out.println(queue.toString());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println("Dequeue..." + queue.dequeue());
        System.out.println(queue.toString());
    }
}
查看更多
Summer. ? 凉城
7楼-- · 2019-01-14 15:43

Perhaps extend ArrayBlockingQueue. In order to get access to the (package-access) lock, I had to put my sub-class within the same package. Caveat: I haven't tested this.

package java.util.concurrent;

import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> {

    public DeDupingBlockingQueue(int capacity) {
        super(capacity);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair) {
        super(capacity, fair);
    }

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        super(capacity, fair, c);
    }

    @Override
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return false;
            return super.add(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void put(E e) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //Should this be lock.lock() instead?
        try {
            if (contains(e)) return;
            super.put(e); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (contains(e)) return true;
            return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock.
        } finally {
            lock.unlock();
        }
    }
}
查看更多
登录 后发表回答