How are lazy streams implemented in Java 8?

2019-03-11 00:36发布

问题:

I am reading Java 8, specifically the "Streams API". I wanted to know how streams can be lazy?

I believe streams are just added as a library and there are no changes done to the language to support laziness. Also, I will be shocked if somebody tells me it's achieved through reflection.

回答1:

Why would you need reflection to get laziness? For example, consider this class:

class LazySeq<T> {

    private final List<T> list;
    private Predicate<? super T> predicate;

    public LazySeq(List<T> input) {
        this.list = new ArrayList<>(input);
    }

    //Here you just store the predicate, but you don't perform a filtering
    //You could also return a new LazySeq with a new state
    public LazySeq<T> filter(Predicate<? super T> predicate) {
        this.predicate = predicate;
        return this;
    }

    public void forEach(Consumer<? super T> consumer){
        if(predicate == null) {
            list.forEach(consumer);
        } else {
            for(T elem : list) {
                if(predicate.test(elem)) {
                    consumer.accept(elem);
                }
            }
        }
    }
}

When you call filter on the lazy seq, the filtering does not happen immediately so for example:

LazySeq<Integer> lazySeq = new LazySeq<>(Arrays.asList(1, 2, 3, 4));
lazySeq = lazySeq.filter(i -> i%2 == 0);

If you see the content of the sequence after calling filter, you'll see that it's always 1, 2, 3, 4. However when calling a terminal operation, such as forEach, then the filtering will be done before using the consumer. So for example:

lazySeq.filter(i -> i%2 == 0).forEach(System.out::println);

will print 2 and 4.

This is the same principle with Streams. From a source, you chain operations which have certains properties. These operations are either intermediate, which returns a lazy stream (such as filter or map), or terminal (such as forEach). Some of these terminal operations are short-circuiting (such as findFirst), so you might not traverse all the pipeline (you can think of an early return in a for loop that returns the index of a value in an array for example).

When calling a terminal operation, this chain of operations start to execute so that at the end you get the expected result.

Laziness can be achieved by storing a new state on the pipeline when an intermediate op is applied, and when you call a terminal op, you go by all the states one-by-one on the data.

The Stream API is not really implemented that way (it's a bit more complex) but really the principle is here.



回答2:

No reflection or proxies. Reflection and proxies come with a performance cost that should be avoided unless there isn't an alternative and performance is number one in Java.

What makes laziness possible is the functional style of doing things. Basically a stream starts with a source(ex: List), number of intermediate operations (ex: filters, map..) , and a terminal operation (ex: count, sum, etc..). Intermediate steps execute lazily because you pass functions (lambdas) that get chained in the pipeline to be executed at the terminal step.

ex: filter(Predicate<? super T>)

filter in this example expects a function that tells us whether an object in the stream meets some criteria or not.

A lot of features that come from Java 7 have been used to make this efficient. Ex: invoke dynamic for executing lambdas rather than proxies or anonymous inner classes and ForkJoin pools for parallel execution.

If you are interested in Java 8 internals,then you have to watch this talk given by the expert in the field Brian Goetz, it's on Youtube.



回答3:

streaming is not a container of data, but a container of logic. you just need to pass in an instance of interface to remember the logic.

consider following code:

class FilterIterable<T> implements Iterable<T>
{
    private Iterable<? extends T> iter;
    private Predicate<? super T> pred;

    public FilterIterable(Iterable<? extends T> iter, Predicate<? super T> pred) {
        this.iter = iter;
        this.pred = pred;
    }

    public Iterator<T> iterator() {
        return FilterIterator<T>();
    }

    class FilterIterator<T> implements Iterator<T>
    {
        private Iterator<? extends T> iterator = iter.iterator();
        private T next = null;

        FilterIterator() {
            getNext();
        }

        private void getNext() {
            next = null;
            while (iterator.hasNext()) {
                T temp = iterator.next();
                if (pred.test(temp)) {
                    next = temp;
                    break;
                }
            }
        }

        public boolean hasNext() { return next != null; }

        public T next() {
            T temp = next;
            getNext();
            return temp;
        }       
    }
}

the logic is wrapped inside of pred, but only invoked when the object gets iterated. and in fact, this class stores no data, it only keeps an iterable that may contain data, or even just another logic holder itself.

and elements are returned on demand too. this kind of paradigm makes so called stream api lazy.