Often it's not very clear how exactly the parallel stream splits the input into chunks and in which order the chunks are joined. Is there any way to visualize the whole procedure for any stream source to better understand what's going on? Suppose I created a stream like this:
Stream<Integer> stream = IntStream.range(0, 100).boxed().parallel();
I want to see some tree-like structure:
[0..99]
_____/ \_____
| |
[0..49] [50..99]
__/ \__ __/ \__
| | | |
[0..24] [25..49] [50..74] [75..99]
Which means that the whole input range [0..99]
is split to [0..49]
and [50..99]
ranges which in turn split further. Of course such diagram should reflect the real work of Stream API, so if I perform some real operation with such stream the splitting should be performed in the same way.
I want to augment Tagir’s great answer with a solution to monitor the splitting at the source side or even at intermediate operations (with some restrictions imposed by the current stream API implementation):
public static <E> Stream<E> proxy(Stream<E> src) {
Class<Stream<E>> sClass=(Class)Stream.class;
Class<Spliterator<E>> spClass=(Class)Spliterator.class;
return proxy(src, sClass, spClass, StreamSupport::stream);
}
public static IntStream proxy(IntStream src) {
return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream);
}
public static LongStream proxy(LongStream src) {
return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream);
}
public static DoubleStream proxy(DoubleStream src) {
return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream);
}
static final Object EMPTY=new StringBuilder("empty");
static <E,S extends BaseStream<E,S>, Sp extends Spliterator<E>> S proxy(
S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) {
final class Node<T> implements InvocationHandler,Runnable,
Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer {
final Class<? extends Spliterator> type;
Spliterator<T> src;
Object first=EMPTY, last=EMPTY;
Node<T> left, right;
Object currConsumer;
public Node(Spliterator<T> src, Class<? extends Spliterator> type) {
this.src = src;
this.type=type;
}
private void value(Object t) {
if(first==EMPTY) first=t;
last=t;
}
public void accept(Object t) {
value(t); ((Consumer)currConsumer).accept(t);
}
public void accept(int t) {
value(t); ((IntConsumer)currConsumer).accept(t);
}
public void accept(long t) {
value(t); ((LongConsumer)currConsumer).accept(t);
}
public void accept(double t) {
value(t); ((DoubleConsumer)currConsumer).accept(t);
}
public void run() {
System.out.println();
finish().forEach(System.out::println);
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Node<T> curr=this; while(curr.right!=null) curr=curr.right;
if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) {
curr.currConsumer=args[0];
args[0]=curr;
}
if(method.getName().equals("trySplit")) {
Spliterator s=curr.src.trySplit();
if(s==null) return null;
Node<T> pfx=new Node<>(s, type);
pfx.left=curr.left; curr.left=pfx;
curr.right=new Node<>(curr.src, type);
src=null;
return pfx.create();
}
return method.invoke(curr.src, args);
}
Object create() {
return Proxy.newProxyInstance(null, new Class<?>[]{type}, this);
}
String pad(String s, int left, int len) {
if (len == s.length())
return s;
char[] result = new char[len];
Arrays.fill(result, ' ');
s.getChars(0, s.length(), result, left);
return new String(result);
}
public List<String> finish() {
String cur = toString();
if (left == null) {
return Collections.singletonList(cur);
}
List<String> l = left.finish();
List<String> r = right.finish();
int len1 = l.get(0).length();
int len2 = r.get(0).length();
int totalLen = len1 + len2 + 1;
int leftAdd = 0;
if (cur.length() < totalLen) {
cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
} else {
leftAdd = (cur.length() - totalLen) / 2;
totalLen = cur.length();
}
List<String> result = new ArrayList<>();
result.add(cur);
char[] dashes = new char[totalLen];
Arrays.fill(dashes, ' ');
Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
+ leftAdd, '_');
int mid = totalLen / 2;
dashes[mid] = '/';
dashes[mid + 1] = '\\';
result.add(new String(dashes));
Arrays.fill(dashes, ' ');
dashes[len1 / 2 + leftAdd] = '|';
dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
result.add(new String(dashes));
int maxSize = Math.max(l.size(), r.size());
for (int i = 0; i < maxSize; i++) {
String lstr = l.size() > i ? l.get(i) : String.format("%"
+ len1 + "s", "");
String rstr = r.size() > i ? r.get(i) : String.format("%"
+ len2 + "s", "");
result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
}
return result;
}
private Object first() {
if(left==null) return first;
Object o=left.first();
if(o==EMPTY) o=right.first();
return o;
}
private Object last() {
if(right==null) return last;
Object o=right.last();
if(o==EMPTY) o=left.last();
return o;
}
public String toString() {
Object o=first(), p=last();
return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+']': "]");
}
}
Node<E> n=new Node<>(src.spliterator(), spc);
Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n);
return f.apply(sp, true).onClose(n);
}
It allows to wrap a spliterator with a proxy which will monitor the split operations and the encountered objects. The logic of the chunk handling is similar to Tagir’s, in fact, I copied his result printing routine(s).
You may pass in the source of the stream or a stream with same operations already appended. (In the latter case, you should apply .parallel()
as early as possible to the stream). As Tagir explained, in most cases, the split behavior depends on the source and the configured parallelism, thus, in most cases, monitoring intermediate states may change the values, but not the processed chunks:
try(IntStream is=proxy(IntStream.range(0, 100).parallel())) {
is.filter(i -> i/20%2==0)
.mapToObj(ix->"\""+ix+'"')
.forEach(s->{});
}
will print
[0..99]
___________________________________/\________________________________
| |
[0..49] [50..99]
_________________/\______________ _________________/\________________
| | | |
[0..24] [25..49] [50..74] [75..99]
________/\_____ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99]
___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
whereas
try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0)
.mapToObj(ix->"\""+ix+'"'))) {
s.forEach(str->{});
}
will print
["0".."99"]
___________________________________________/\___________________________________________
| |
["0".."49"] ["50".."99"]
____________________/\______________________ ______________________/\___________________
| | | |
["0".."19"] ["40".."49"] ["50".."59"] ["80".."99"]
____________/\_________ ____________/\______ _______/\___________ ____________/\________
| | | | | | | |
["0".."11"] ["12".."19"] (empty) ["40".."49"] ["50".."59"] (empty) ["80".."86"] ["87".."99"]
_____/\___ _____/\_____ ___/\__ _____/\_____ _____/\_____ ___/\__ _____/\__ _____/\_____
| | | | | | | | | | | | | | | |
["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]
As we can see here, we are monitoring the result of .filter(…).mapToObj(…)
but the chunks are clearly determined by the source, possibly producing empty chunks down-stream depending on the filter’s condition.
Note that we can combine the source monitoring with Tagir’s collector monitoring:
try(IntStream s=proxy(IntStream.range(0, 100))) {
s.parallel().filter(i -> i/20%2==0)
.boxed().collect(parallelVisualize())
.forEach(System.out::println);
}
This will print (note that the collect
output is printed first):
[0..99]
________________________________/\_______________________________
| |
[0..49] [50..99]
________________/\______________ _______________/\_______________
| | | |
[0..19] [40..49] [50..59] [80..99]
________/\_____ ________/\______ _______/\_______ ________/\_____
| | | | | | | |
[0..11] [12..19] (empty) [40..49] [50..59] (empty) [80..86] [87..99]
___/\_ ___/\___ ___/\__ ___/\___ ___/\___ ___/\__ ___/\_ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99]
[0..99]
___________________________________/\________________________________
| |
[0..49] [50..99]
_________________/\______________ _________________/\________________
| | | |
[0..24] [25..49] [50..74] [75..99]
________/\_____ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99]
___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
We can clearly see how the chunks of the processing match, but after the filtering, some chunks have less elements, some of them are entirely empty.
This is the place to demonstrate, where the two ways of monitoring can make a significant difference:
try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) {
is.boxed()
.collect(parallelVisualize())
.forEach(System.out::println);
}
[0.0..99.0]
___________________________________________________/\________________________________________________
| |
[0.0..49.0] [50.0..99.0]
_________________________/\______________________ _________________________/\________________________
| | | |
[0.0..24.0] [25.0..49.0] [50.0..74.0] [75.0..99.0]
____________/\_________ ____________/\___________ ____________/\___________ ____________/\___________
| | | | | | | |
[0.0..11.0] [12.0..24.0] [25.0..36.0] [37.0..49.0] [50.0..61.0] [62.0..74.0] [75.0..86.0] [87.0..99.0]
_____/\___ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____ _____/\_____
| | | | | | | | | | | | | | | |
[0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0]
[0.0..10239.0]
_____________________________/\_____
| |
[0.0..1023.0] [1024.0..10239.0]
____________________/\_______
| |
[1024.0..3071.0] [3072.0..10239.0]
____________/\______
| |
[3072.0..6143.0] [6144.0..10239.0]
___/\_______
| |
[6144.0..10239.0] (empty)
This demonstrates what Tagir already explained, streams with an unknown size split poorly, and even the fact the limit(…)
provides the possibility for a good estimate (in fact, infinite + limit is theoretically predictable), the implementation does not take any advantage of it.
The source is split into chunks utilizing a batch size of 1024
, increased by 1024
after each split, creating chunks way outside the range imposed by limit
. We can also see how a prefix is split off each time.
But when we look at the terminal split output, we can see that in-between these excess chunks have been dropped and another splitting of the first chunk has happened. Since this chunk is backend by an intermediate array that has been filled by the default implementation on the first split, we don’t notice it at the source but we can see at the terminal action that this array has been split (unsurprisingly) well balanced.
So we need both ways of monitoring to get the full picture here…
Current Stream API implementation uses collector combiner to combine the intermediate results in exactly the same way as they were previously split. Also the splitting strategy depends on the source and common pool parallelism level, but does not depend on exact reduction operation used (the same for reduce
, collect
, forEach
, count
, etc.). Relying on this it's not very difficult to create the visualizing collector:
public static Collector<Object, ?, List<String>> parallelVisualize() {
class Range {
private String first, last;
private Range left, right;
void accept(Object obj) {
if (first == null)
first = obj.toString();
else
last = obj.toString();
}
Range combine(Range that) {
Range p = new Range();
p.first = first == null ? that.first : first;
p.last = Stream
.of(that.last, that.first, this.last, this.first)
.filter(Objects::nonNull).findFirst().orElse(null);
p.left = this;
p.right = that;
return p;
}
String pad(String s, int left, int len) {
if (len == s.length())
return s;
char[] result = new char[len];
Arrays.fill(result, ' ');
s.getChars(0, s.length(), result, left);
return new String(result);
}
public List<String> finish() {
String cur = toString();
if (left == null) {
return Collections.singletonList(cur);
}
List<String> l = left.finish();
List<String> r = right.finish();
int len1 = l.get(0).length();
int len2 = r.get(0).length();
int totalLen = len1 + len2 + 1;
int leftAdd = 0;
if (cur.length() < totalLen) {
cur = pad(cur, (totalLen - cur.length()) / 2, totalLen);
} else {
leftAdd = (cur.length() - totalLen) / 2;
totalLen = cur.length();
}
List<String> result = new ArrayList<>();
result.add(cur);
char[] dashes = new char[totalLen];
Arrays.fill(dashes, ' ');
Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1
+ leftAdd, '_');
int mid = totalLen / 2;
dashes[mid] = '/';
dashes[mid + 1] = '\\';
result.add(new String(dashes));
Arrays.fill(dashes, ' ');
dashes[len1 / 2 + leftAdd] = '|';
dashes[len1 + len2 / 2 + 1 + leftAdd] = '|';
result.add(new String(dashes));
int maxSize = Math.max(l.size(), r.size());
for (int i = 0; i < maxSize; i++) {
String lstr = l.size() > i ? l.get(i) : String.format("%"
+ len1 + "s", "");
String rstr = r.size() > i ? r.get(i) : String.format("%"
+ len2 + "s", "");
result.add(pad(lstr + " " + rstr, leftAdd, totalLen));
}
return result;
}
public String toString() {
if (first == null)
return "(empty)";
else if (last == null)
return "[" + first + "]";
return "[" + first + ".." + last + "]";
}
}
return Collector.of(Range::new, Range::accept, Range::combine,
Range::finish);
}
Here's some interesting results obtained with this collector using 4-core machine (results will differ on machine with different number of availableProcessors()
).
Splitting of simple range:
IntStream.range(0, 100)
.boxed().parallel().collect(parallelVisualize())
.forEach(System.out::println);
Even split to 16 tasks:
[0..99]
___________________________________/\________________________________
| |
[0..49] [50..99]
_________________/\______________ _________________/\________________
| | | |
[0..24] [25..49] [50..74] [75..99]
________/\_____ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99]
___/\_ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]
Split of two streams concatenation:
IntStream
.concat(IntStream.range(0, 10), IntStream.range(10, 100))
.boxed().parallel().collect(parallelVisualize())
.forEach(System.out::println);
As you can see, first split un-concatenates the streams:
[0..99]
_______________________________________________________________________/\_____
| |
[0..9] [10..99]
__/\__ ___________________________________/\__________________________________
| | | |
[0..4] [5..9] [10..54] [55..99]
_________________/\________________ _________________/\________________
| | | |
[10..31] [32..54] [55..76] [77..99]
________/\_______ ________/\_______ ________/\_______ ________/\_______
| | | | | | | |
[10..20] [21..31] [32..42] [43..54] [55..65] [66..76] [77..87] [88..99]
___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___ ___/\___
| | | | | | | | | | | | | | | |
[10..14] [15..20] [21..25] [26..31] [32..36] [37..42] [43..48] [49..54] [55..59] [60..65] [66..70] [71..76] [77..81] [82..87] [88..93] [94..99]
Split of two stream concatenation where intermediate operation (boxed()) was performed before concatenation:
Stream.concat(IntStream.range(0, 50).boxed().parallel(), IntStream.range(50, 100).boxed())
.collect(parallelVisualize())
.forEach(System.out::println);
If one of input streams was not turned into parallel mode before concatenation, it refuses to split at all:
[0..99]
___/\_________________________________
| |
[0..49] [50..99]
_________________/\______________
| |
[0..24] [25..49]
________/\_____ ________/\_______
| | | |
[0..11] [12..24] [25..36] [37..49]
___/\_ ___/\___ ___/\___ ___/\___
| | | | | | | |
[0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49]
Split of flatmapping:
Stream.of(0, 50)
.flatMap(start -> IntStream.range(start, start+50).boxed().parallel())
.parallel().collect(parallelVisualize())
.forEach(System.out::println);
Flat-map never parallelizes inside nested streams:
[0..99]
____/\__
| |
[0..49] [50..99]
Stream from unknown-sized iterator of 7000 elements (see this answer for context):
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
IntStream.range(0, 7000).iterator(),
Spliterator.ORDERED), true)
.collect(parallelVisualize()).forEach(System.out::println);
The splitting is really bad, everybody waits for biggest part [3072..6143]:
[0..6999]
_______________________/\___
| |
[0..1023] [1024..6999]
________________/\____
| |
[1024..3071] [3072..6999]
_________/\_____
| |
[3072..6143] [6144..6999]
___/\____
| |
[6144..6999] (empty)
Iterator source with known size:
StreamSupport
.stream(Spliterators.spliterator(IntStream.range(0, 7000)
.iterator(), 7000, Spliterator.ORDERED), true)
.collect(parallelVisualize()).forEach(System.out::println);
Supplying the size makes things much better unlocking the further splitting:
[0..6999]
______________________________________________________________________________________________/\________
| |
[0..1023] [1024..6999]
_____/\__ ____________________________________________________________________/\________________________
| | | |
[0..511] [512..1023] [1024..3071] [3072..6999]
____________/\___________ ________________/\__________________________________________________
| | | |
[1024..2047] [2048..3071] [3072..6143] [6144..6999]
_____/\_____ _____/\_____ _________________________/\________________________ ___/\___________
| | | | | | | |
[1024..1535] [1536..2047] [2048..2559] [2560..3071] [3072..4607] [4608..6143] [6144..6999] (empty)
____________/\___________ ____________/\___________ _____/\_____
| | | | | |
[3072..3839] [3840..4607] [4608..5375] [5376..6143] [6144..6571] [6572..6999]
_____/\_____ _____/\_____ _____/\_____ _____/\_____
| | | | | | | |
[3072..3455] [3456..3839] [3840..4223] [4224..4607] [4608..4991] [4992..5375] [5376..5759] [5760..6143]
Further improvements of such collector is possible to generate graphical image (like svg), track the threads where each node is processed, display number of elements per each group and so on. Use it if you like.