产生无限的并行流(Generate infinite parallel stream)

2019-09-27 13:22发布

问题

嗨,我有在那里我会返回一个平行的无限流功能(是的,它是在这种情况下要快得多)生成的结果。 所以很明显(或没有)我用

Stream<Something> stream = Stream.generate(this::myGenerator).parallel()

它的工作原理,但是......它没有当我要限制结果(一切都很好,当数据流是连续的)。 我的意思是,它创造的结果,当我提出类似

stream.peek(System.out::println).limit(2).collect(Collectors.toList())

但即使在peek输出产生超过10个元素, collect仍不finallized(产生慢所以那些10可采取甚至一分钟)...,这是简单的例子。 事实上,限制这些结果是未来由于主要期望是让只比最近的结果更好,直到用户终止该进程(另一种情况是先返回什么我可以在抛出异常,如果不出意外将帮助[ findFirst没”吨,即使当我不得不在控制台上多个元件和进行约30秒没有更多结果])。

所以,问题是...

如何与复制? 我的想法是使用也RxJava,还有另外一个问题 - 如何与工具(或其他),以达到同样的效果。

代码示例

public Stream<Solution> generateSolutions() {
     final Solution initialSolution = initialSolutionMaker.findSolution();
     return Stream.concat(
          Stream.of(initialSolution),
          Stream.generate(continuousSolutionMaker::findSolution)
    ).parallel();
}

new Solver(instance).generateSolutions()
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .limit(5).collect(Collectors.toList());

实施findSolution并不重要。 它就像增加了解决方案,回购(单,Sych发表等),但仅此而已了一些副作用。

Answer 1:

正如解释的已链接的答案 ,关键点到一个有效的并行数据流是使用流源已经具有固有大小,而不是使用一个未施胶或甚至无限流并应用limit就可以了。 注入的大小不会与所有当前实现工作,同时确保已知大小不迷路要容易得多。 即使确切大小不能被保留,施加时像filter ,其尺寸仍然将进行作为估计值的大小。

因此,而不是

Stream.generate(this::myGenerator).parallel()
      .peek(System.out::println)
      .limit(2)
      .collect(Collectors.toList())

只是使用

IntStream.range(0, /* limit */ 2).unordered().parallel()
         .mapToObj(unused -> this.myGenerator())
         .peek(System.out::println)
         .collect(Collectors.toList())

或者,更接近你的示例代码

public Stream<Solution> generateSolutions(int limit) {
    final Solution initialSolution = initialSolutionMaker.findSolution();
    return Stream.concat(
         Stream.of(initialSolution),
         IntStream.range(1, limit).unordered().parallel()
               .mapToObj(unused -> continuousSolutionMaker.findSolution())
   );
}

new Solver(instance).generateSolutions(5)
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .collect(Collectors.toList());


Answer 2:

不幸的是,这是预期的行为。 我记得我看到在这个问题上至少有两个题目,这里是其中的一个 。

这个想法是, Stream.generate创建一个unordered infinite streamlimit将不引入SIZED标志。 正因为如此,当你产生一个parallel于该流的执行,各个任务必须同步它们的执行,看看他们是否已经达到极限; 通过时间同步化作业时有可能是已经处理的多个元件。 例如这样的:

 Stream.iterate(0, x -> x + 1)
            .peek(System.out::println)
            .parallel()
            .limit(2)
            .collect(Collectors.toList());

和这个 :

IntStream.of(1, 2, 3, 4)
            .peek(System.out::println)
            .parallel()
            .limit(2)
            .boxed()
            .collect(Collectors.toList());

总是会生成在两个元件ListCollectors.toList )和将总是输出两个元件还(经由peek )。

在另一方面这样的:

Stream<Integer> stream = Stream.generate(new Random()::nextInt).parallel();

List<Integer> list = stream
            .peek(x -> {
                System.out.println("Before " + x);
            })
            .map(x -> {
                System.out.println("Mapping x " + x);
                return x;
            })
            .peek(x -> {
                System.out.println("After " + x);
            })
            .limit(2)
            .collect(Collectors.toList());

将产生的两个要素List ,但也可以处理更多的以后将被丢弃limit 。 这是你实际上是在你的例子看到。

要去的唯一理智的方式(只要我可以告诉)将创建一个自定义Spliterator。 我没有写很多人,但这里是我的尝试:

 static class LimitingSpliterator<T> implements Spliterator<T> {

    private int limit;

    private final Supplier<T> generator;

    private LimitingSpliterator(Supplier<T> generator, int limit) {
        Preconditions.checkArgument(limit > 0);
        this.limit = limit;
        this.generator = Objects.requireNonNull(generator);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> consumer) {
        if (limit == 0) {
            return false;
        }
        T nextElement = generator.get();
        --limit;
        consumer.accept(nextElement);
        return true;
    }

    @Override
    public LimitingSpliterator<T> trySplit() {

        if (limit <= 1) {
            return null;
        }

        int half = limit >> 1;
        limit = limit - half;
        return new LimitingSpliterator<>(generator, half);
    }

    @Override
    public long estimateSize() {
        return limit >> 1;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}

和用法是:

 StreamSupport.stream(new LimitingSpliterator<>(new Random()::nextInt, 7), true)
            .peek(System.out::println)
            .collect(Collectors.toList());


文章来源: Generate infinite parallel stream