In my application I use several Streams that provide Elements of the form ( ID, value ). An Element is defined by the following class:
static final class Element<T> implements Comparable<Element<T>> {
final long id;
final T value;
Element(int id, T value) {
this.id = id;
this.value = value;
}
@Override
public int compareTo(Element o) {
return Long.compare(id, o.id);
}
}
My goal is to join two or more Streams by the Element's IDs (in each stream, the IDs are sorted and strictly monotonic), e.g.:
Stream <Element> colour = Arrays.stream(new Element[]{new Element(1, "red"), new Element(2, "green"), new Element(4, "red"), new Element(6, "blue")});
Stream <Element> length = Arrays.stream(new Element[]{new Element(2, 28), new Element(3, 9), new Element(4, 17), new Element(6, 11)});
Stream <Element> mass = Arrays.stream(new Element[]{new Element(1, 87.9f), new Element(2, 21.0f), new Element(3, 107f)});
into a single Stream that contains Elements of the form ( ID, [T1, T2, T3] ):
Stream<Element<Object[]>> allProps = joinStreams(colour, length, mass);
by applying some method like this:
public Stream<Element<Object[]>> joinStreams(Stream<Element>... streams) {
return ...;
}
The resulting Stream should deliver a FULL OUTER JOIN, i.e. for the above example:
1, "red", null, 87.9
2, "green", 28, 21.0
3, null, 9, 107
4, "red" 17, null
6, "blue", 11, null
Since my experience with Java's streaming API is quite basic so far I normally use iterators for such tasks.
Is there an idiomatic (and efficient) way to perfom this kind of join with Streams? Are there any utility libraries that I could use?
Side note: The example is simplified. The application receives the data from something like a column-oriented data store (no real DMBS), that is several gigabytes in size and does not fit easily into memory. There's also no built-in support for this kind of join operation.
To construct a full outer join stream implementation, I use two blocking queues. A queue is associated with each stream and a Filler class (a Runnable implementation) reads data from a stream and writes it to the queue. When the filler class runs out of data, it writes an end-of-stream marker to the queue. I then construct a spliterator from AbstractSpliterator. The tryAdvance method implementation takes a value from the left queue and right queue and consumes or holds these values depending on the comparator result. I use a variation of your Element class. See the following code:
import java.util.ArrayList;
import java.util.Collection;
public final class Element<T> implements Comparable<Element<T>> {
final long id;
final Collection<T> value;
public Element(int id, T value) {
this.id = id;
// Order preserving
this.value = new ArrayList<T>();
this.value.add(value);
}
Element(long id, Element<T> e1, Element<T> e2) {
this.id = id;
this.value = new ArrayList<T>();
add(e1);
add(e2);
}
private void add(Element<T> e1) {
if(e1 == null) {
this.value.add(null);
} else {
this.value.addAll(e1.value);
}
}
/**
* Used as End-of-Stream marker
*/
Element() {
id = -1;
value = null;
}
@Override
public int compareTo(Element<T> o) {
return Long.compare(id, o.id);
}
}
Join Implementation
import java.util.Comparator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class OuterJoinSpliterator<T> extends Spliterators.AbstractSpliterator<Element<T>> {
private final class Filler implements Runnable {
private final Stream<Element<T>> stream;
private final BlockingQueue<Element<T>> queue;
private Filler(Stream<Element<T>> stream, BlockingQueue<Element<T>> queue) {
this.stream = stream;
this.queue = queue;
}
@Override
public void run() {
stream.forEach(x -> {
try {
queue.put(x);
} catch (final InterruptedException e) {
e.printStackTrace();
}
});
try {
queue.put(EOS);
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}
public final Element<T> EOS = new Element<T>();
private final int queueSize;
private final BlockingQueue<Element<T>> leftQueue;
private final BlockingQueue<Element<T>> rightQueue;
protected Element<T> leftValue;
protected Element<T> rightValue;
private OuterJoinSpliterator(long estSize, int characteristics, int queueSize,
Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) {
super(estSize, characteristics);
this.queueSize = queueSize;
leftQueue = createQueue();
rightQueue = createQueue();
createFillerThread(leftStream, leftQueue).start();
createFillerThread(rightStream, rightQueue).start();
}
private Element<T> acceptBoth(long id, Element<T> left, Element<T> right) {
return new Element<T>(id, left, right);
}
private final Element<T> acceptLeft(Element<T> left) {
return acceptBoth(left.id, left, null);
}
private final Element<T> acceptRight(Element<T> right) {
return acceptBoth(right.id, null, right);
}
private final Thread createFillerThread(Stream<Element<T>> leftStream, BlockingQueue<Element<T>> queue) {
return new Thread(new Filler(leftStream, queue));
}
private final ArrayBlockingQueue<Element<T>> createQueue() {
return new ArrayBlockingQueue<>(queueSize);
}
@Override
public Comparator<? super Element<T>> getComparator() {
return null;
}
private final boolean isFinished() {
return leftValue == EOS && rightValue == EOS;
}
@Override
public final boolean tryAdvance(Consumer<? super Element<T>> action) {
try {
updateLeft();
updateRight();
if (isFinished()) {
return false;
}
if (leftValue == EOS) {
action.accept(acceptRight(rightValue));
rightValue = null;
} else if (rightValue == EOS) {
action.accept(acceptLeft(leftValue));
leftValue = null;
} else {
switch (leftValue.compareTo(rightValue)) {
case -1:
action.accept(acceptLeft(leftValue));
leftValue = null;
break;
case 1:
action.accept(acceptRight(rightValue));
rightValue = null;
break;
default:
action.accept(acceptBoth(leftValue.id, leftValue, rightValue));
leftValue = null;
rightValue = null;
}
}
} catch (final InterruptedException e) {
return false;
}
return true;
}
private final void updateLeft() throws InterruptedException {
if (leftValue == null) {
leftValue = leftQueue.take();
}
}
private final void updateRight() throws InterruptedException {
if (rightValue == null) {
rightValue = rightQueue.take();
}
}
public static <T> Stream<Element<T>> join(long estSize, int characteristics, int queueSize, boolean parallel, Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) {
Spliterator<Element<T>> spliterator = new OuterJoinSpliterator<>(estSize, characteristics, queueSize, leftStream, rightStream);
return StreamSupport.stream(spliterator, parallel);
}
}
You can use Long.MAX_VALUE as your estimated size. See the Spliterator interface for a description of the various stream characteristics. See comments for AbstractSpliterator for additional information.
The simplest solution is to write iterator, and then use StreamSupport::stream to create stream from iterator. But you can find some problems with perfomance if you are going to use parallel stream.