How to check if a Java 8 Stream is empty?

2019-01-17 14:00发布

问题:

How can I check if a Stream is empty and throw an exception if it's not, as a non-terminal operation?

Basically, I'm looking for something equivalent to the code below, but without materializing the stream in-between. In particular, the check should not occur before the stream is actually consumed by a terminal operation.

public Stream<Thing> getFilteredThings() {
    Stream<Thing> stream = getThings().stream()
                .filter(Thing::isFoo)
                .filter(Thing::isBar);
    return nonEmptyStream(stream, () -> {
        throw new RuntimeException("No foo bar things available")   
    });
}

private static <T> Stream<T> nonEmptyStream(Stream<T> stream, Supplier<T> defaultValue) {
    List<T> list = stream.collect(Collectors.toList());
    if (list.isEmpty()) list.add(defaultValue.get());
    return list.stream();
}

回答1:

If you can live with limited parallel capablilities, the following solution will work:

private static <T> Stream<T> nonEmptyStream(
    Stream<T> stream, Supplier<RuntimeException> e) {

    Spliterator<T> it=stream.spliterator();
    return StreamSupport.stream(new Spliterator<T>() {
        boolean seen;
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean r=it.tryAdvance(action);
            if(!seen && !r) throw e.get();
            seen=true;
            return r;
        }
        public Spliterator<T> trySplit() { return null; }
        public long estimateSize() { return it.estimateSize(); }
        public int characteristics() { return it.characteristics(); }
    }, false);
}

Here is some example code using it:

List<String> l=Arrays.asList("hello", "world");
nonEmptyStream(l.stream(), ()->new RuntimeException("No strings available"))
  .forEach(System.out::println);
nonEmptyStream(l.stream().filter(s->s.startsWith("x")),
               ()->new RuntimeException("No strings available"))
  .forEach(System.out::println);

The problem with (efficient) parallel execution is that supporting splitting of the Spliterator requires a thread-safe way to notice whether either of the fragments has seen any value in a thread-safe manner. Then the last of the fragments executing tryAdvance has to realize that it is the last one (and it also couldn’t advance) to throw the appropriate exception. So I didn’t add support for splitting here.



回答2:

The other answers and comments are correct in that to examine the contents of a stream, one must add a terminal operation, thereby "consuming" the stream. However, one can do this and turn the result back into a stream, without buffering up the entire contents of the stream. Here are a couple examples:

static <T> Stream<T> throwIfEmpty(Stream<T> stream) {
    Iterator<T> iterator = stream.iterator();
    if (iterator.hasNext()) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
    } else {
        throw new NoSuchElementException("empty stream");
    }
}

static <T> Stream<T> defaultIfEmpty(Stream<T> stream, Supplier<T> supplier) {
    Iterator<T> iterator = stream.iterator();
    if (iterator.hasNext()) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
    } else {
        return Stream.of(supplier.get());
    }
}

Basically turn the stream into an Iterator in order to call hasNext() on it, and if true, turn the Iterator back into a Stream. This is inefficient in that all subsequent operations on the stream will go through the Iterator's hasNext() and next() methods, which also implies that the stream is effectively processed sequentially (even if it's later turned parallel). However, this does allow you to test the stream without buffering up all of its elements.

There is probably a way to do this using a Spliterator instead of an Iterator. This potentially allows the returned stream to have the same characteristics as the input stream, including running in parallel.



回答3:

You must perform a terminal operation on the Stream in order for any of the filters to be applied. Therefore you can't know if it will be empty until you consume it.

Best you can do is terminate the Stream with a findAny() terminal operation, which will stop when it finds any element, but if there are none, it will have to iterate over all the input list to find that out.

This would only help you if the input list has many elements, and one of the first few passes the filters, since only a small subset of the list would have to be consumed before you know the Stream is not empty.

Of course you'll still have to create a new Stream in order to produce the output list.



回答4:

Following Stuart's idea, this could be done with a Spliterator like this:

static <T> Stream<T> defaultIfEmpty(Stream<T> stream, Stream<T> defaultStream) {
    final Spliterator<T> spliterator = stream.spliterator();
    final AtomicReference<T> reference = new AtomicReference<>();
    if (spliterator.tryAdvance(reference::set)) {
        return Stream.concat(Stream.of(reference.get()), StreamSupport.stream(spliterator, stream.isParallel()));
    } else {
        return defaultStream;
    }
}

I think this works with parallel Streams as the stream.spliterator() operation will terminate the stream, and then rebuild it as required

In my use-case I needed a default Stream rather than a default value. that's quite easy to change if this is not what you need