问题
嗨,我有在那里我会返回一个平行的无限流功能(是的,它是在这种情况下要快得多)生成的结果。 所以很明显(或没有)我用
Stream<Something> stream = Stream.generate(this::myGenerator).parallel()
它的工作原理,但是......它没有当我要限制结果(一切都很好,当数据流是连续的)。 我的意思是,它创造的结果,当我提出类似
stream.peek(System.out::println).limit(2).collect(Collectors.toList())
但即使在peek
输出产生超过10个元素, collect
仍不finallized(产生慢所以那些10可采取甚至一分钟)...,这是简单的例子。 事实上,限制这些结果是未来由于主要期望是让只比最近的结果更好,直到用户终止该进程(另一种情况是先返回什么我可以在抛出异常,如果不出意外将帮助[ findFirst
没”吨,即使当我不得不在控制台上多个元件和进行约30秒没有更多结果])。
所以,问题是...
如何与复制? 我的想法是使用也RxJava,还有另外一个问题 - 如何与工具(或其他),以达到同样的效果。
代码示例
public Stream<Solution> generateSolutions() {
final Solution initialSolution = initialSolutionMaker.findSolution();
return Stream.concat(
Stream.of(initialSolution),
Stream.generate(continuousSolutionMaker::findSolution)
).parallel();
}
new Solver(instance).generateSolutions()
.map(Solution::getPurpose)
.peek(System.out::println)
.limit(5).collect(Collectors.toList());
实施findSolution
并不重要。 它就像增加了解决方案,回购(单,Sych发表等),但仅此而已了一些副作用。
正如解释的已链接的答案 ,关键点到一个有效的并行数据流是使用流源已经具有固有大小,而不是使用一个未施胶或甚至无限流并应用limit
就可以了。 注入的大小不会与所有当前实现工作,同时确保已知大小不迷路要容易得多。 即使确切大小不能被保留,施加时像filter
,其尺寸仍然将进行作为估计值的大小。
因此,而不是
Stream.generate(this::myGenerator).parallel()
.peek(System.out::println)
.limit(2)
.collect(Collectors.toList())
只是使用
IntStream.range(0, /* limit */ 2).unordered().parallel()
.mapToObj(unused -> this.myGenerator())
.peek(System.out::println)
.collect(Collectors.toList())
或者,更接近你的示例代码
public Stream<Solution> generateSolutions(int limit) {
final Solution initialSolution = initialSolutionMaker.findSolution();
return Stream.concat(
Stream.of(initialSolution),
IntStream.range(1, limit).unordered().parallel()
.mapToObj(unused -> continuousSolutionMaker.findSolution())
);
}
new Solver(instance).generateSolutions(5)
.map(Solution::getPurpose)
.peek(System.out::println)
.collect(Collectors.toList());
不幸的是,这是预期的行为。 我记得我看到在这个问题上至少有两个题目,这里是其中的一个 。
这个想法是, Stream.generate
创建一个unordered infinite stream
和limit
将不引入SIZED
标志。 正因为如此,当你产生一个parallel
于该流的执行,各个任务必须同步它们的执行,看看他们是否已经达到极限; 通过时间同步化作业时有可能是已经处理的多个元件。 例如这样的:
Stream.iterate(0, x -> x + 1)
.peek(System.out::println)
.parallel()
.limit(2)
.collect(Collectors.toList());
和这个 :
IntStream.of(1, 2, 3, 4)
.peek(System.out::println)
.parallel()
.limit(2)
.boxed()
.collect(Collectors.toList());
总是会生成在两个元件List
( Collectors.toList
)和将总是输出两个元件还(经由peek
)。
在另一方面这样的:
Stream<Integer> stream = Stream.generate(new Random()::nextInt).parallel();
List<Integer> list = stream
.peek(x -> {
System.out.println("Before " + x);
})
.map(x -> {
System.out.println("Mapping x " + x);
return x;
})
.peek(x -> {
System.out.println("After " + x);
})
.limit(2)
.collect(Collectors.toList());
将产生的两个要素List
,但也可以处理更多的以后将被丢弃limit
。 这是你实际上是在你的例子看到。
要去的唯一理智的方式(只要我可以告诉)将创建一个自定义Spliterator。 我没有写很多人,但这里是我的尝试:
static class LimitingSpliterator<T> implements Spliterator<T> {
private int limit;
private final Supplier<T> generator;
private LimitingSpliterator(Supplier<T> generator, int limit) {
Preconditions.checkArgument(limit > 0);
this.limit = limit;
this.generator = Objects.requireNonNull(generator);
}
@Override
public boolean tryAdvance(Consumer<? super T> consumer) {
if (limit == 0) {
return false;
}
T nextElement = generator.get();
--limit;
consumer.accept(nextElement);
return true;
}
@Override
public LimitingSpliterator<T> trySplit() {
if (limit <= 1) {
return null;
}
int half = limit >> 1;
limit = limit - half;
return new LimitingSpliterator<>(generator, half);
}
@Override
public long estimateSize() {
return limit >> 1;
}
@Override
public int characteristics() {
return SIZED;
}
}
和用法是:
StreamSupport.stream(new LimitingSpliterator<>(new Random()::nextInt, 7), true)
.peek(System.out::println)
.collect(Collectors.toList());