In this case just odd lines have meaningful data and there is no character that uniquely identifies those lines. My intention is to get something equivalent to the following example:
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isOddLine())
.map(line -> toDomainObject(line))
Is there any “clean” way to do it, without sharing global state?
A clean way is to go one level deeper and implement a Spliterator
. On this level you can control the iteration over the stream elements and simply iterate over two items whenever the downstream requests one item:
public class OddLines<T> extends Spliterators.AbstractSpliterator<T>
implements Consumer<T> {
public static <T> Stream<T> oddLines(Stream<T> source) {
return StreamSupport.stream(new OddLines(source.spliterator()), false);
}
private static long odd(long l) { return l==Long.MAX_VALUE? l: (l+1)/2; }
Spliterator<T> originalLines;
OddLines(Spliterator<T> source) {
super(odd(source.estimateSize()), source.characteristics());
originalLines=source;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if(originalLines==null || !originalLines.tryAdvance(action))
return false;
if(!originalLines.tryAdvance(this)) originalLines=null;
return true;
}
@Override
public void accept(T t) {}
}
Then you can use it like
Stream<DomainObject> res = OddLines.oddLines(Files.lines(src))
.map(line -> toDomainObject(line));
This solution has no side effects and retains most advantages of the Stream
API like the lazy evaluation. However, it should be clear that it hasn’t a useful semantics for unordered stream processing (beware about the subtle aspects like using forEachOrdered
rather than forEach
when performing a terminal action on all elements) and while supporting parallel processing in principle, it’s unlikely to be very efficient…
No, there's no way to do this conveniently with the API. (Basically the same reason as to why there is no easy way of having a zipWithIndex
, see Is there a concise way to iterate over a stream with indices in Java 8?).
You can still use Stream
, but go for an iterator:
Iterator<String> iter = Files.lines(src).iterator();
while (iter.hasNext()) {
iter.next(); // discard
toDomainObject(iter.next()); // use
}
(You might want to use try-with-resource on that stream though.)
As aioobe said, there isn't a convenient way to do this, but there are several inconvenient ways. :-)
Here's another spliterator-based approach. Unlike Holger's, which wraps another spliterator, this one does the I/O itself. This gives greater control over things like ordering, but it also means that it has to deal with IOException and close handling. I also threw in a Predicate
parameter that lets you get a crack at which lines get passed through.
static class LineSpliterator extends Spliterators.AbstractSpliterator<String>
implements AutoCloseable {
final BufferedReader br;
final LongPredicate pred;
long count = 0L;
public LineSpliterator(Path path, LongPredicate pred) throws IOException {
super(Long.MAX_VALUE, Spliterator.ORDERED);
br = Files.newBufferedReader(path);
this.pred = pred;
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
try {
String s;
while ((s = br.readLine()) != null) {
if (pred.test(++count)) {
action.accept(s);
return true;
}
}
return false;
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}
@Override
public void close() {
try {
br.close();
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}
public static Stream<String> lines(Path path, LongPredicate pred) throws IOException {
LineSpliterator ls = new LineSpliterator(path, pred);
return StreamSupport.stream(ls, false)
.onClose(() -> ls.close());
}
}
You'd use it within a try-with-resources to ensure that the file is closed, even if an exception occurs:
static void printOddLines() throws IOException {
try (Stream<String> lines = LineSpliterator.lines(PATH, x -> (x & 1L) == 1L)) {
lines.forEach(System.out::println);
}
}
You can do this with a custom spliterator:
public class EvenOdd {
public static final class EvenSpliterator<T> implements Spliterator<T> {
private final Spliterator<T> underlying;
boolean even;
public EvenSpliterator(Spliterator<T> underlying, boolean even) {
this.underlying = underlying;
this.even = even;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (even) {
even = false;
return underlying.tryAdvance(action);
}
if (!underlying.tryAdvance(t -> {})) {
return false;
}
return underlying.tryAdvance(action);
}
@Override
public Spliterator<T> trySplit() {
if (!hasCharacteristics(SUBSIZED)) {
return null;
}
final Spliterator<T> newUnderlying = underlying.trySplit();
if (newUnderlying == null) {
return null;
}
final boolean oldEven = even;
if ((newUnderlying.estimateSize() & 1) == 1) {
even = !even;
}
return new EvenSpliterator<>(newUnderlying, oldEven);
}
@Override
public long estimateSize() {
return underlying.estimateSize()>>1;
}
@Override
public int characteristics() {
return underlying.characteristics();
}
}
public static void main(String[] args) {
final EvenSpliterator<Integer> spliterator = new EvenSpliterator<>(IntStream.range(1, 100000).parallel().mapToObj(Integer::valueOf).spliterator(), false);
final List<Integer> result = StreamSupport.stream(spliterator, true).parallel().collect(Collectors.toList());
final List<Integer> expected = IntStream.range(1, 100000 / 2).mapToObj(i -> i * 2).collect(Collectors.toList());
if (result.equals(expected)) {
System.out.println("Yay! Expected result.");
}
}
}
Following the @aioobe algorithm, here's another spliterator-based approach, as proposed by @Holger but more concise, even if less effective.
public static <T> Stream<T> filterOdd(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res, false);
}
Then you can use it like
Stream<DomainObject> res = Files.lines(src)
filterOdd(res)
.map(line -> toDomainObject(line))