How can I check if a stream instance has been consumed or not (meaning having called a terminal operation such that any further call to a terminal operation may fail with IllegalStateException: stream has already been operated upon or closed.
?
Ideally I want a method that does not consume the stream if it has not yet been consumed, and that returns a boolean false if the stream has been consumed without catching an IllegalStateException
from a stream method (because using Exceptions for control flow is expensive and error prone, in particular when using standard Exceptions).
A method similar to hasNext()
in Iterator in the exception throwing and boolean return behavior (though without the contract to next()
).
Example:
public void consume(java.util.function.Consumer<Stream<?>> consumer, Stream<?> stream) {
consumer.accept(stream);
// defensive programming, check state
if (...) {
throw new IllegalStateException("consumer must call terminal operation on stream");
}
}
The goal is to fail early if client code calls this method without consuming the stream.
It seems there is no method to do that and I'd have to add a try-catch block calling any terminal operation like iterator()
, catch an exception and throw a new one.
An acceptable answer can also be "No solution exists" with a good justification of why the specification could not add such a method (if a good justification exists). It seems that the JDK streams usually have this snippets at the start of their terminal methods:
// in AbstractPipeline.java
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
So for those streams, an implementation of such a method would not seem that difficult.
Taking into consideration that spliterator
(for example) is a terminal operation, you can simply create a method like:
private static <T> Optional<Stream<T>> isConsumed(Stream<T> stream) {
Spliterator<T> spliterator;
try {
spliterator = stream.spliterator();
} catch (IllegalStateException ise) {
return Optional.empty();
}
return Optional.of(StreamSupport.stream(
() -> spliterator,
spliterator.characteristics(),
stream.isParallel()));
}
I don't know of a better way to do it... And usage would be:
Stream<Integer> ints = Stream.of(1, 2, 3, 4)
.filter(x -> x < 3);
YourClass.isConsumed(ints)
.ifPresent(x -> x.forEachOrdered(System.out::println));
Since I don't think there is a practical reason to return an already consumed Stream, I am returning Optional.empty()
instead.
One solution could be to add an intermediate operation (e.g. filter()
) to the stream
before passing it to the consumer
. In that operation you do nothing but saving the state, that the operation was called (e.g. with an AtomicBoolean
):
public <T> void consume(Consumer<Stream<T>> consumer, Stream<T> stream) {
AtomicBoolean consumed = new AtomicBoolean(false);
consumer.accept(stream.filter(i -> {
consumed.set(true);
return true;
}));
if (!consumed.get()) {
throw new IllegalStateException("consumer must call terminal operation on stream");
}
}
Side Note: Do not use peek()
for this, because it is not called with short-circuiting terminal operations (like findAny()
).
Here is a standalone compilable solution that uses a delegating custom Spliterator<T>
implementation + an AtomicBoolean
to accomplish what you seek without losing thread-safety or affecting the parallelism of a Stream<T>
.
The main entry is the Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback)
function - you can do whatever you want in the callback function. I first tinkered with a delegating Stream<T>
implementation but it's just too big an interface to delegate without any issues (see my code comment, even Spliterator<T>
has its caveats when delegating):
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
class StackOverflowQuestion56927548Scratch {
private static class TrackingSpliterator<T> implements Spliterator<T> {
private final AtomicBoolean tracker;
private final Spliterator<T> delegate;
private final Runnable callback;
public TrackingSpliterator(Stream<T> forStream, Runnable callback) {
this(new AtomicBoolean(true), forStream.spliterator(), callback);
}
private TrackingSpliterator(
AtomicBoolean tracker,
Spliterator<T> delegate,
Runnable callback
) {
this.tracker = tracker;
this.delegate = delegate;
this.callback = callback;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean advanced = delegate.tryAdvance(action);
if(tracker.compareAndSet(true, false)) {
callback.run();
}
return advanced;
}
@Override
public Spliterator<T> trySplit() {
Spliterator<T> split = this.delegate.trySplit();
//may return null according to JavaDoc
if(split == null) {
return null;
}
return new TrackingSpliterator<>(tracker, split, callback);
}
@Override
public long estimateSize() {
return delegate.estimateSize();
}
@Override
public int characteristics() {
return delegate.characteristics();
}
}
public static <T> Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) {
return StreamSupport.stream(
new TrackingSpliterator<>(input, () -> callback.accept(input)),
input.isParallel()
);
}
public static void main(String[] args) {
//some big stream to show it works correctly when parallelized
Stream<Integer> stream = IntStream.range(0, 100000000)
.mapToObj(Integer::valueOf)
.parallel();
Stream<Integer> trackedStream = track(stream, s -> System.out.println("consume"));
//dummy consume
System.out.println(trackedStream.anyMatch(i -> i.equals(-1)));
}
}
Just return the stream of the track
function, maybe adapt the callback
parameters type (you probably don't need to pass the stream) and you are good to go.
Please note that this implementation only tracks when the stream is actually consumed, calling .count()
on a Stream
that was produced by e.g. IntStream.range(0,1000)
(without any filter steps etc.) will not consume the stream but return the underlying known length of the stream via Spliterator<T>.estimateSize()
!