Java - Ring Buffer

2019-01-02 17:47发布

问题:

I have a streaming time series, of which I am interested in keeping the last 4 elements, which means I want to be able to pop the first, and add to the end. Which Java Collection is the best for this? Vector ?

回答1:

Consider CircularFifoBuffer from Apache Common.Collections. Unlike Queue you don't have to maintain the limited size of underlying collection and wrap it once you hit the limit.

Buffer buf = new CircularFifoBuffer(4);
buf.add("A");
buf.add("B");
buf.add("C");
buf.add("D"); //ABCD
buf.add("E"); //BCDE

CircularFifoBuffer will do this for you because of the following properties:

  • CircularFifoBuffer is a first in first out buffer with a fixed size that replaces its oldest element if full.
  • The removal order of a CircularFifoBuffer is based on the insertion order; elements are removed in the same order in which they were added. The iteration order is the same as the removal order.
  • The add(Object), BoundedFifoBuffer.remove() and BoundedFifoBuffer.get() operations all perform in constant time. All other operations perform in linear time or worse.

However you should consider it's limitations as well - for example, you can't add missing timeseries to this collection becaue it doens't allow nulls.



回答2:

Since Guava 15.0 (released September 2013) there's EvictingQueue:

A non-blocking queue which automatically evicts elements from the head of the queue when attempting to add new elements onto the queue and it is full. An evicting queue must be configured with a maximum size. Each time an element is added to a full queue, the queue automatically removes its head element. This is different from conventional bounded queues, which either block or reject new elements when full.

This class is not thread-safe, and does not accept null elements.

Example use:

EvictingQueue<String> queue = EvictingQueue.create(2);
queue.add("a");
queue.add("b");
queue.add("c");
queue.add("d");
System.out.print(queue); //outputs [c, d]


回答3:

Since Java 1.6, there is ArrayDeque, which implements Queue and seems to be faster and more memory efficient than a LinkedList and doesn't have the thread synchronization overhead of the ArrayBlockingQueue: from the API docs: "This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue."

final Queue<Object> q = new ArrayDeque<Object>();
q.add(new Object()); //insert element
q.poll(); //remove element


回答4:

If you need

  • O(1) insertion and removal
  • O(1) indexing to interior elements
  • access from a single thread only
  • generic element type

then you can use this CircularArrayList for Java in this way (for example):

CircularArrayList<String> buf = new CircularArrayList<String>(4);

buf.add("A");
buf.add("B");
buf.add("C");
buf.add("D"); // ABCD

String pop = buf.remove(0); // A <- BCD
buf.add("E"); // BCDE

String interiorElement = buf.get(i);

All these methods run in O(1).



回答5:

I had the same problem some time ago and was disappointed because I couldn't find any solution that suites my needs so I wrote my own class. Honestly, I did found some code back then, but even that wasn't what I was searching for so I adapted it and now I'm sharing it, just like the author of that piece of code did.

EDIT: This is the original (although slightly different) code: CircularArrayList for java

I don't have the link of the source because it was time ago, but here's the code:

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.RandomAccess;

public class CircularArrayList<E> extends AbstractList<E> implements RandomAccess {

private final int n; // buffer length
private final List<E> buf; // a List implementing RandomAccess
private int leader = 0;
private int size = 0;


public CircularArrayList(int capacity) {
    n = capacity + 1;
    buf = new ArrayList<E>(Collections.nCopies(n, (E) null));
}

public int capacity() {
    return n - 1;
}

private int wrapIndex(int i) {
    int m = i % n;
    if (m < 0) { // modulus can be negative
        m += n;
    }
    return m;
}

@Override
public int size() {
    return this.size;
}

@Override
public E get(int i) {
    if (i < 0 || i >= n-1) throw new IndexOutOfBoundsException();

    if(i > size()) throw new NullPointerException("Index is greater than size.");

    return buf.get(wrapIndex(leader + i));
}

@Override
public E set(int i, E e) {
    if (i < 0 || i >= n-1) {
        throw new IndexOutOfBoundsException();
    }
    if(i == size()) // assume leader's position as invalid (should use insert(e))
        throw new IndexOutOfBoundsException("The size of the list is " + size() + " while the index was " + i
                +". Please use insert(e) method to fill the list.");
    return buf.set(wrapIndex(leader - size + i), e);
}

public void insert(E e)
{
    int s = size();     
    buf.set(wrapIndex(leader), e);
    leader = wrapIndex(++leader);
    buf.set(leader, null);
    if(s == n-1)
        return; // we have replaced the eldest element.
    this.size++;

}

@Override
public void clear()
{
    int cnt = wrapIndex(leader-size());
    for(; cnt != leader; cnt = wrapIndex(++cnt))
        this.buf.set(cnt, null);
    this.size = 0;      
}

public E removeOldest() {
    int i = wrapIndex(leader+1);

    for(;;i = wrapIndex(++i)) {
        if(buf.get(i) != null) break;
        if(i == leader)
            throw new IllegalStateException("Cannot remove element."
                    + " CircularArrayList is empty.");
    }

    this.size--;
    return buf.set(i, null);
}

@Override
public String toString()
{
    int i = wrapIndex(leader - size());
    StringBuilder str = new StringBuilder(size());

    for(; i != leader; i = wrapIndex(++i)){
        str.append(buf.get(i));
    }
    return str.toString();
}

public E getOldest(){
    int i = wrapIndex(leader+1);

    for(;;i = wrapIndex(++i)) {
        if(buf.get(i) != null) break;
        if(i == leader)
            throw new IllegalStateException("Cannot remove element."
                    + " CircularArrayList is empty.");
    }

    return buf.get(i);
}

public E getNewest(){
    int i = wrapIndex(leader-1);
    if(buf.get(i) == null)
        throw new IndexOutOfBoundsException("Error while retrieving the newest element. The Circular Array list is empty.");
    return buf.get(i);
}
}


回答6:

None of the previously given examples were meeting my needs completely, so I wrote my own queue that allows following functionality: iteration, index access, indexOf, lastIndexOf, get first, get last, offer, remaining capacity, expand capacity, dequeue last, dequeue first, enqueue / add element, dequeue / remove element, subQueueCopy, subArrayCopy, toArray, snapshot, basics like size, remove or contains.

EjectingQueue

EjectingIntQueue



回答7:

A very interesting project is disruptor. It has a ringbuffer and is used from what I know in financial applications.

See here: code of ringbuffer

I checked both Guava's EvictingQueue and ArrayDeque.

ArrayDeque does not limit growth if it's full it will double size and hence is not precisely acting like a ringbuffer.

EvictingQueue does what it promises but internally uses a Deque to store things and just bounds memory.

Hence, if you care about memory being bounded ArrayDeque is not fullfilling your promise. If you care about object count EvictingQueue uses internal composition (bigger object size).

A simple and memory efficient one can be stolen from jmonkeyengine. verbatim copy

import java.util.Iterator;
import java.util.NoSuchElementException;

public class RingBuffer<T> implements Iterable<T> {

  private T[] buffer;          // queue elements
  private int count = 0;          // number of elements on queue
  private int indexOut = 0;       // index of first element of queue
  private int indexIn = 0;       // index of next available slot

  // cast needed since no generic array creation in Java
  public RingBuffer(int capacity) {
    buffer = (T[]) new Object[capacity];
  }

  public boolean isEmpty() {
    return count == 0;
  }

  public int size() {
    return count;
  }

  public void push(T item) {
    if (count == buffer.length) {
        throw new RuntimeException("Ring buffer overflow");
    }
    buffer[indexIn] = item;
    indexIn = (indexIn + 1) % buffer.length;     // wrap-around
    count++;
  }

  public T pop() {
    if (isEmpty()) {
        throw new RuntimeException("Ring buffer underflow");
    }
    T item = buffer[indexOut];
    buffer[indexOut] = null;                  // to help with garbage collection
    count--;
    indexOut = (indexOut + 1) % buffer.length; // wrap-around
    return item;
  }

  public Iterator<T> iterator() {
    return new RingBufferIterator();
  }

  // an iterator, doesn't implement remove() since it's optional
  private class RingBufferIterator implements Iterator<T> {

    private int i = 0;

    public boolean hasNext() {
        return i < count;
    }

    public void remove() {
        throw new UnsupportedOperationException();
    }

    public T next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        return buffer[i++];
    }
  }
}


回答8:

Use a Queue

Queue<String> qe=new LinkedList<String>();

qe.add("a");
qe.add("b");
qe.add("c");
qe.add("d");

System.out.println(qe.poll()); //returns a
System.out.println(qe.poll()); //returns b
System.out.println(qe.poll()); //returns c
System.out.println(qe.poll()); //returns d

There's five simple methods of a Queue

  • element() -- Retrieves, but does not remove, the head of this queue.

  • offer(E o) -- Inserts the specified element into this queue, if
    possible.

  • peek() -- Retrieves, but does not remove, the head of this queue, returning null if this queue is empty.

  • poll() -- Retrieves and removes the head of this queue, or null if this queue is empty.

  • remove() -- Retrieves and removes the head of this queue.


标签: