How to skip even lines of a Stream obtaine

2019-01-26 08:29发布

问题:

In this case just odd lines have meaningful data and there is no character that uniquely identifies those lines. My intention is to get something equivalent to the following example:

Stream<DomainObject> res = Files.lines(src)
     .filter(line -> isOddLine())
     .map(line -> toDomainObject(line))

Is there any “clean” way to do it, without sharing global state?

回答1:

A clean way is to go one level deeper and implement a Spliterator. On this level you can control the iteration over the stream elements and simply iterate over two items whenever the downstream requests one item:

public class OddLines<T> extends Spliterators.AbstractSpliterator<T>
    implements Consumer<T> {

    public static <T> Stream<T> oddLines(Stream<T> source) {
        return StreamSupport.stream(new OddLines(source.spliterator()), false);
    }
    private static long odd(long l) { return l==Long.MAX_VALUE? l: (l+1)/2; }
    Spliterator<T> originalLines;

    OddLines(Spliterator<T> source) {
        super(odd(source.estimateSize()), source.characteristics());
        originalLines=source;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if(originalLines==null || !originalLines.tryAdvance(action))
            return false;
        if(!originalLines.tryAdvance(this)) originalLines=null;
        return true;
    }

    @Override
    public void accept(T t) {}
}

Then you can use it like

Stream<DomainObject> res = OddLines.oddLines(Files.lines(src))
    .map(line -> toDomainObject(line));

This solution has no side effects and retains most advantages of the Stream API like the lazy evaluation. However, it should be clear that it hasn’t a useful semantics for unordered stream processing (beware about the subtle aspects like using forEachOrdered rather than forEach when performing a terminal action on all elements) and while supporting parallel processing in principle, it’s unlikely to be very efficient…



回答2:

No, there's no way to do this conveniently with the API. (Basically the same reason as to why there is no easy way of having a zipWithIndex, see Is there a concise way to iterate over a stream with indices in Java 8?).

You can still use Stream, but go for an iterator:

Iterator<String> iter = Files.lines(src).iterator();
while (iter.hasNext()) {
    iter.next();                  // discard
    toDomainObject(iter.next());  // use
}

(You might want to use try-with-resource on that stream though.)



回答3:

As aioobe said, there isn't a convenient way to do this, but there are several inconvenient ways. :-)

Here's another spliterator-based approach. Unlike Holger's, which wraps another spliterator, this one does the I/O itself. This gives greater control over things like ordering, but it also means that it has to deal with IOException and close handling. I also threw in a Predicate parameter that lets you get a crack at which lines get passed through.

static class LineSpliterator extends Spliterators.AbstractSpliterator<String>
        implements AutoCloseable {
    final BufferedReader br;
    final LongPredicate pred;
    long count = 0L;

    public LineSpliterator(Path path, LongPredicate pred) throws IOException {
        super(Long.MAX_VALUE, Spliterator.ORDERED);
        br = Files.newBufferedReader(path);
        this.pred = pred;
    }

    @Override
    public boolean tryAdvance(Consumer<? super String> action) {
        try {
            String s;
            while ((s = br.readLine()) != null) {
                if (pred.test(++count)) {
                    action.accept(s);
                    return true;
                }
            }
            return false;
        } catch (IOException ioe) {
            throw new UncheckedIOException(ioe);
        }
    }

    @Override
    public void close() {
        try {
            br.close();
        } catch (IOException ioe) {
            throw new UncheckedIOException(ioe);
        }
    }

    public static Stream<String> lines(Path path, LongPredicate pred) throws IOException {
        LineSpliterator ls = new LineSpliterator(path, pred);
        return StreamSupport.stream(ls, false)
                            .onClose(() -> ls.close());
    }
}

You'd use it within a try-with-resources to ensure that the file is closed, even if an exception occurs:

static void printOddLines() throws IOException {
    try (Stream<String> lines = LineSpliterator.lines(PATH, x -> (x & 1L) == 1L)) {
        lines.forEach(System.out::println);
    }
}


回答4:

You can do this with a custom spliterator:

public class EvenOdd {
    public static final class EvenSpliterator<T> implements Spliterator<T> {
        private final Spliterator<T> underlying;
        boolean even;

        public EvenSpliterator(Spliterator<T> underlying, boolean even) {
            this.underlying = underlying;
            this.even = even;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (even) {
                even = false;

                return underlying.tryAdvance(action);
            }
            if (!underlying.tryAdvance(t -> {})) {
                return false;
            }
            return underlying.tryAdvance(action);
        }

        @Override
        public Spliterator<T> trySplit() {
            if (!hasCharacteristics(SUBSIZED)) {
                return null;
            }
            final Spliterator<T> newUnderlying = underlying.trySplit();
            if (newUnderlying == null) {
                return null;
            }
            final boolean oldEven = even;

            if ((newUnderlying.estimateSize() & 1) == 1) {
                even = !even;
            }

            return new EvenSpliterator<>(newUnderlying, oldEven);
        }

        @Override
        public long estimateSize() {
            return underlying.estimateSize()>>1;
        }

        @Override
        public int characteristics() {
            return underlying.characteristics();
        }
    }

    public static void main(String[] args) {

        final EvenSpliterator<Integer> spliterator = new EvenSpliterator<>(IntStream.range(1, 100000).parallel().mapToObj(Integer::valueOf).spliterator(), false);
        final List<Integer> result = StreamSupport.stream(spliterator, true).parallel().collect(Collectors.toList());
        final List<Integer> expected = IntStream.range(1, 100000 / 2).mapToObj(i -> i * 2).collect(Collectors.toList());
        if (result.equals(expected)) {
            System.out.println("Yay! Expected result.");
        }
    }
}


回答5:

Following the @aioobe algorithm, here's another spliterator-based approach, as proposed by @Holger but more concise, even if less effective.

public static <T> Stream<T> filterOdd(Stream<T> src) {
    Spliterator<T> iter = src.spliterator();
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
    {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            iter.tryAdvance(item -> {});    // discard
            return iter.tryAdvance(action); // use
        }
    };
    return StreamSupport.stream(res, false);
}

Then you can use it like

Stream<DomainObject> res = Files.lines(src)
filterOdd(res)
 .map(line -> toDomainObject(line))