Getting the next item from a Java 8 stream

2019-01-24 11:25发布

问题:

I'd like to retrieve and remove the next item from a Java 8 Stream, without this Stream getting closed.

Stream<Integer> integerStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Integer zero = integerStream.getNext(); // 0
Integer one  = integerStream.getNext(); // 1
...

Is this possible?

回答1:

Yes, there is a way to do this, but with some limitations.

Stream<Integer> infiniteStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Iterator<Integer> iter = infiniteStream.iterator();
Integer zero = iter.next();
Integer one  = iter.next();

Alternatively,

Stream<Integer> infiniteStream = Stream.iterate( 0, x -> new Integer(x + 1) );
Spliterator<Integer> spliterator = infiniteStream.spliterator();
spliterator.tryAdvance(i -> System.out.println(i)); // zero
spliterator.tryAdvance(i -> System.out.println(i)); // one

Given a Stream, it's possible to get an Iterator or Spliterator from it, or to query whether it's a parallel stream, etc. These are defined on the BaseStream interface, a superinterface of Stream, which makes them a bit easy to miss.

In this case we know the stream is infinite, so there is no need to call the Iterator's hasNext() method or to check the return value of the Spliterator's tryAdvance()

The limitation is that both the iterator() and spliterator() methods of Stream are terminal operations which means that after they're called, the returned Iterator or Spliterator has exclusive access to the values represented by the Stream. Further operations on the stream (such as filter or map and so forth) are not permitted and will be met with IllegalStateException.

If you wanted to peel off the first couple elements and then resume stream processing, you could turn a spliterator back into a stream like so:

Stream<Integer> stream2 = StreamSupport.stream(spliterator, false);

This will probably work fine for some things, but I'm not sure I'd recommend this technique in general. I think it adds a few extra objects and thus extra method calls in the path of producing the next element.

Editorial comments (not related to your question):

  • Don't use new Integer(val). Instead use Integer.valueOf(val) which will reuse the boxed integer if it's available, which is generally true for values in the range -128 to 127.
  • You can use IntStream instead of Stream<Integer> which avoids boxing overhead entirely. It doesn't have the full complement of stream operations, but it does have iterate() which takes a function that operates on primitive int values.


回答2:

Based on Stuart's answer and with an Iterator-to-Stream conversion, I came up with the following quick-and-dirty wrapper class. It's not tested, and it's not thread-safe, but it provides me with what I currently need — removing and using single items while keeping this stream "open".

PeelingStream<T> provides a method T getNext() that shields away someWrappedStream.iterator()'s terminal stream operation semantics:

public class PeelingStream<T> implements Stream<T> {

    private Stream<T> wrapped;

    public PeelingStream(Stream<T> toBeWrapped) {
        this.wrapped = toBeWrapped;
    }

    public T getNext() {
        Iterator<T> iterator = wrapped.iterator();
        T next = iterator.next();
        Iterable<T> remainingIterable = () -> iterator;
        wrapped = StreamSupport.stream(remainingIterable.spliterator(),
                false);

        return next;
    }

    ///////////////////// from here, only plain delegate methods

    public Iterator<T> iterator() {
        return wrapped.iterator();
    }

    public Spliterator<T> spliterator() {
        return wrapped.spliterator();
    }

    public boolean isParallel() {
        return wrapped.isParallel();
    }

    public Stream<T> sequential() {
        return wrapped.sequential();
    }

    public Stream<T> parallel() {
        return wrapped.parallel();
    }

    public Stream<T> unordered() {
        return wrapped.unordered();
    }

    public Stream<T> onClose(Runnable closeHandler) {
        return wrapped.onClose(closeHandler);

    }

    public void close() {
        wrapped.close();
    }

    public Stream<T> filter(Predicate<? super T> predicate) {
        return wrapped.filter(predicate);
    }

    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
        return wrapped.map(mapper);
    }

    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
        return wrapped.mapToInt(mapper);
    }

    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
        return wrapped.mapToLong(mapper);
    }

    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
        return wrapped.mapToDouble(mapper);
    }

    public <R> Stream<R> flatMap(
            Function<? super T, ? extends Stream<? extends R>> mapper) {
        return wrapped.flatMap(mapper);
    }

    public IntStream flatMapToInt(
            Function<? super T, ? extends IntStream> mapper) {
        return wrapped.flatMapToInt(mapper);
    }

    public LongStream flatMapToLong(
            Function<? super T, ? extends LongStream> mapper) {
        return wrapped.flatMapToLong(mapper);
    }

    public DoubleStream flatMapToDouble(
            Function<? super T, ? extends DoubleStream> mapper) {
        return wrapped.flatMapToDouble(mapper);
    }

    public Stream<T> distinct() {
        return wrapped.distinct();
    }

    public Stream<T> sorted() {
        return wrapped.sorted();
    }

    public Stream<T> sorted(Comparator<? super T> comparator) {
        return wrapped.sorted(comparator);
    }

    public Stream<T> peek(Consumer<? super T> action) {
        return wrapped.peek(action);
    }

    public Stream<T> limit(long maxSize) {
        return wrapped.limit(maxSize);
    }

    public Stream<T> skip(long n) {
        return wrapped.skip(n);
    }

    public void forEach(Consumer<? super T> action) {
        wrapped.forEach(action);
    }

    public void forEachOrdered(Consumer<? super T> action) {
        wrapped.forEachOrdered(action);
    }

    public Object[] toArray() {
        return wrapped.toArray();
    }

    public <A> A[] toArray(IntFunction<A[]> generator) {
        return wrapped.toArray(generator);
    }

    public T reduce(T identity, BinaryOperator<T> accumulator) {
        return wrapped.reduce(identity, accumulator);
    }

    public Optional<T> reduce(BinaryOperator<T> accumulator) {
        return wrapped.reduce(accumulator);
    }

    public <U> U reduce(U identity,
            BiFunction<U, ? super T, U> accumulator,
            BinaryOperator<U> combiner) {
        return wrapped.reduce(identity, accumulator, combiner);
    }

    public <R> R collect(Supplier<R> supplier,
            BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return wrapped.collect(supplier, accumulator, combiner);
    }

    public <R, A> R collect(Collector<? super T, A, R> collector) {
        return wrapped.collect(collector);
    }

    public Optional<T> min(Comparator<? super T> comparator) {
        return wrapped.min(comparator);
    }

    public Optional<T> max(Comparator<? super T> comparator) {
        return wrapped.max(comparator);
    }

    public long count() {
        return wrapped.count();
    }

    public boolean anyMatch(Predicate<? super T> predicate) {
        return wrapped.anyMatch(predicate);
    }

    public boolean allMatch(Predicate<? super T> predicate) {
        return wrapped.allMatch(predicate);
    }

    public boolean noneMatch(Predicate<? super T> predicate) {
        return wrapped.noneMatch(predicate);
    }

    public Optional<T> findFirst() {
        return wrapped.findFirst();
    }

    public Optional<T> findAny() {
        return wrapped.findAny();
    }

}

A small test:

@Test
public void testPeelingOffItemsFromStream() {

    Stream<Integer> infiniteStream = Stream.iterate(0, x -> x + 1);

    PeelingStream<Integer> peelingInfiniteStream = new PeelingStream<>(infiniteStream);

    Integer one = peelingInfiniteStream.getNext();
    assertThat(one, equalTo(0));

    Integer two = peelingInfiniteStream.getNext();
    assertThat(two, equalTo(1));

    Stream<Integer> limitedStream = peelingInfiniteStream.limit(3); // 2 3 4
    int sumOf234 = limitedStream.mapToInt(x -> x.intValue()).sum();
    assertThat(sumOf234, equalTo(2 + 3 + 4));

}