I am looking for an operation on a Stream that enables me to perform a non-terminal (and/or terminal) operation every nth item. Although I use a stream of primes for example, the stream could just as easily be web-requests, user actions, or some other cold data or live feed being produced.
From this:
Duration start = Duration.ofNanos(System.nanoTime());
IntStream.iterate(2, n -> n + 1)
.filter(Findprimes::isPrime)
.limit(1_000_1000 * 10)
.forEach(System.out::println);
System.out.println("Duration: " + Duration.ofNanos(System.nanoTime()).minus(start));
To a stream function like this:
IntStream.iterate(2, n -> n + 1)
.filter(Findprimes::isPrime)
.limit(1_000_1000 * 10)
.peekEvery(10, System.out::println)
.forEach( it -> {});
Create a helper method to wrap the peek()
consumer:
public static IntConsumer every(int count, IntConsumer consumer) {
if (count <= 0)
throw new IllegalArgumentException("Count must be >1: Got " + count);
return new IntConsumer() {
private int i;
@Override
public void accept(int value) {
if (++this.i == count) {
consumer.accept(value);
this.i = 0;
}
}
};
}
You can now use it almost exactly like you wanted:
IntStream.rangeClosed(1, 20)
.peek(every(5, System.out::println))
.count();
Output
5
10
15
20
The helper method can be put in a utility class and statically imported, similar to how the Collectors
class is nothing but static helper methods.
As noted by @user140547 in a comment, this code is not thread-safe, so it cannot be used with parallel streams. Besides, the output order would be messed up, so it doesn't really make sense to use it with parallel streams anyway.
It is not a good idea to rely on peek()
and count()
as it is possible that the operation is not invoked at all if count()
can be calculated without going over the whole stream. Even if it works now, it does not mean that it is also going to work in future. See the javadoc of Stream.count()
in Java 9.
Better use forEach()
.
For the problem itself: In special cases like a simple iteration, you could just filter your objects like.
Stream.iterate(2, n->n+1)
.limit(20)
.filter(n->(n-2)%5==0 && n!=2)
.forEach(System.out::println);
This of course won't work for other cases, where you might use a stateful IntConsumer
. If iterate()
is used, it is probably not that useful to use parallel streams anyway.
If you want a generic solution, you could also try to use a "normal" Stream
, which may not be as efficient as an IntStream
, but should still suffice in many cases:
class Tuple{ // ctor, getter/setter omitted
int index;
int value;
}
Then you could do:
Stream.iterate( new Tuple(1,2),t-> new Tuple(t.index+1,t.value*2))
.limit(30)
.filter(t->t.index %5 == 0)
.forEach(System.out::println);
If you have to use peek()
, you can also do
.peek(t->{if (t.index %5 == 0) System.out.println(t);})
Or if you add methods
static Tuple initialTuple(int value){
return new Tuple(1,value);
}
static UnaryOperator<Tuple> createNextTuple(IntUnaryOperator f){
return current -> new Tuple(current.index+1,f.applyAsInt(current.value));
}
static Consumer<Tuple> every(int n,IntConsumer consumer){
return tuple -> {if (tuple.index % n == 0) consumer.accept(tuple.value);};
}
you can also do (with static imports):
Stream.iterate( initialTuple(2), createNextTuple(x->x*2))
.limit(30)
.peek(every(5,System.out::println))
.forEach(System.out::println);
Try this.
int[] counter = {0};
long result = IntStream.iterate(2, n -> n + 1)
.filter(Findprimes::isPrime)
.limit(100)
.peek(x -> { if (counter[0]++ % 10 == 0) System.out.print(x + " ");} )
.count();
result:
2 31 73 127 179 233 283 353 419 467