Check if java stream has been consumed

2020-08-22 07:08发布

问题:

How can I check if a stream instance has been consumed or not (meaning having called a terminal operation such that any further call to a terminal operation may fail with IllegalStateException: stream has already been operated upon or closed.?

Ideally I want a method that does not consume the stream if it has not yet been consumed, and that returns a boolean false if the stream has been consumed without catching an IllegalStateException from a stream method (because using Exceptions for control flow is expensive and error prone, in particular when using standard Exceptions).

A method similar to hasNext() in Iterator in the exception throwing and boolean return behavior (though without the contract to next()).

Example:

public void consume(java.util.function.Consumer<Stream<?>> consumer, Stream<?> stream) {
   consumer.accept(stream);
   // defensive programming, check state
   if (...) {
       throw new IllegalStateException("consumer must call terminal operation on stream");
   }
}

The goal is to fail early if client code calls this method without consuming the stream.

It seems there is no method to do that and I'd have to add a try-catch block calling any terminal operation like iterator(), catch an exception and throw a new one.

An acceptable answer can also be "No solution exists" with a good justification of why the specification could not add such a method (if a good justification exists). It seems that the JDK streams usually have this snippets at the start of their terminal methods:

// in AbstractPipeline.java
if (linkedOrConsumed)
    throw new IllegalStateException(MSG_STREAM_LINKED);

So for those streams, an implementation of such a method would not seem that difficult.

回答1:

Taking into consideration that spliterator (for example) is a terminal operation, you can simply create a method like:

private static <T> Optional<Stream<T>> isConsumed(Stream<T> stream) {

    Spliterator<T> spliterator;
    try {
        spliterator = stream.spliterator();
    } catch (IllegalStateException ise) {
        return Optional.empty();
    }

    return Optional.of(StreamSupport.stream(
        () -> spliterator,
        spliterator.characteristics(),
        stream.isParallel()));
}

I don't know of a better way to do it... And usage would be:

Stream<Integer> ints = Stream.of(1, 2, 3, 4)
                                 .filter(x -> x < 3);

YourClass.isConsumed(ints)
         .ifPresent(x -> x.forEachOrdered(System.out::println));

Since I don't think there is a practical reason to return an already consumed Stream, I am returning Optional.empty() instead.



回答2:

One solution could be to add an intermediate operation (e.g. filter()) to the stream before passing it to the consumer. In that operation you do nothing but saving the state, that the operation was called (e.g. with an AtomicBoolean):

public <T> void consume(Consumer<Stream<T>> consumer, Stream<T> stream) {
    AtomicBoolean consumed = new AtomicBoolean(false);
    consumer.accept(stream.filter(i -> {
        consumed.set(true);
        return true;
    }));
    if (!consumed.get()) {
        throw new IllegalStateException("consumer must call terminal operation on stream");
    }
}

Side Note: Do not use peek() for this, because it is not called with short-circuiting terminal operations (like findAny()).



回答3:

Here is a standalone compilable solution that uses a delegating custom Spliterator<T> implementation + an AtomicBoolean to accomplish what you seek without losing thread-safety or affecting the parallelism of a Stream<T>.

The main entry is the Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) function - you can do whatever you want in the callback function. I first tinkered with a delegating Stream<T> implementation but it's just too big an interface to delegate without any issues (see my code comment, even Spliterator<T> has its caveats when delegating):

import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

class StackOverflowQuestion56927548Scratch {

    private static class TrackingSpliterator<T> implements Spliterator<T> {
        private final AtomicBoolean tracker;
        private final Spliterator<T> delegate;
        private final Runnable callback;

        public TrackingSpliterator(Stream<T> forStream, Runnable callback) {
            this(new AtomicBoolean(true), forStream.spliterator(), callback);
        }

        private TrackingSpliterator(
                AtomicBoolean tracker,
                Spliterator<T> delegate,
                Runnable callback
        ) {
            this.tracker = tracker;
            this.delegate = delegate;
            this.callback = callback;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean advanced = delegate.tryAdvance(action);
            if(tracker.compareAndSet(true, false)) {
                callback.run();
            }
            return advanced;
        }

        @Override
        public Spliterator<T> trySplit() {
            Spliterator<T> split = this.delegate.trySplit();
            //may return null according to JavaDoc
            if(split == null) {
                return null;
            }
            return new TrackingSpliterator<>(tracker, split, callback);
        }

        @Override
        public long estimateSize() {
            return delegate.estimateSize();
        }

        @Override
        public int characteristics() {
            return delegate.characteristics();
        }
    }

    public static <T> Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) {
        return StreamSupport.stream(
                new TrackingSpliterator<>(input, () -> callback.accept(input)),
                input.isParallel()
        );
    }

    public static void main(String[] args) {
        //some big stream to show it works correctly when parallelized
        Stream<Integer> stream = IntStream.range(0, 100000000)
                .mapToObj(Integer::valueOf)
                .parallel();
        Stream<Integer> trackedStream = track(stream, s -> System.out.println("consume"));

        //dummy consume
        System.out.println(trackedStream.anyMatch(i -> i.equals(-1)));
    }
}

Just return the stream of the track function, maybe adapt the callback parameters type (you probably don't need to pass the stream) and you are good to go.

Please note that this implementation only tracks when the stream is actually consumed, calling .count() on a Stream that was produced by e.g. IntStream.range(0,1000) (without any filter steps etc.) will not consume the stream but return the underlying known length of the stream via Spliterator<T>.estimateSize()!