Java的8个流工具,用于输入数据(Java 8 Stream utilities for inpu

2019-09-29 06:20发布

想象一下,有某种或者通过回调或输入数据的InputStream ,你需要不断地转换成一个Java 8 Stream 。 我们不知道传入数据流停止时,但我们知道,它可以停止。

到目前为止,我已经看到了解决这个问题得到的两种方式,我很感兴趣,就如何实现这一目标的最佳实践。 主要是因为我这一定是某个人已经面对过。 必须有这样做比下面的想法更简单的方法。

1)最简单的方法是把源作为一个Supplier ,只需使用Stream.generate提供数据:

Stream.generate(() -> blockCallToGetData());

但是,这有一个缺点,该流永远不会结束。 因此,只要输入源停止发送,流只是不断调用方法。 除非我们抛出一个运行时异常自然,但是这会变得非常恶劣快。

2)第二个想法是使用一个Iterator (转换为Spliterator ),其中next方法块,直到我们找到的下一个元素。 作为粗例如:

class BlockingIterator implements Iterator<Data> {

  @Override void boolean hasNext() {
    return true;
  }

  @Override Data next() {
    return blockCallToGetData();
  }

}

这样做的好处是,我可以通过返回停止流falsehasNext方法。 然而,在我们不控制输入数据(如回调)的速度的情况下,我们就需要不断的迭代器准备元素的缓冲区。 该缓冲区可以种植无限大,有人呼吁之前next的迭代器。

所以,我的问题就去。 什么是服务阻断输入流的最佳做法是什么?

Answer 1:

这个问题包含了一个可疑的假设: 在职阻断输入到一个流一个很好的做法。 流不是一个反应框架; 同时可以楔入成是一个用大铁撬,这些问题可能会蹦出其他地方的结果。 (EG的考虑这些使用情况,得出的结论是,我们提供过的东西,在一个问题,而不是在两个半工作做了完整的工作做得更好。)

如果你需要一个反应框架,最好的做法是使用一个。 RxJava是巨大的。



Answer 2:

在简单的反应的 ,我们通过使用(简单反应的)解决了这个问题异步队列 (异步包装过JDK队列数据结构),该JDK流可从读出。 如果队列被关闭,流会自动断开。

快速生产者/消费者慢问题可以通过队列来解决。 如果(简单反应)异步队列由有界阻塞队列的支持下,一旦队列已满它会自动减速(块)的任何制造螺纹。

与此相反,LazyFutureStream流实现使用非阻塞队列内部,甚至将尝试从队列数据的消费本身转动,向制作人,如果没有本数据(并且因此其可作为完全的非操作阻断流)

示例使用PushableStreamBuilder :

 PushableLazyFutureStream<Integer> pushable = new PushableStreamBuilder()
            .withBackPressureAfter(100)
            .withBackPressureOn(true)
            .pushableLazyFutureStream();

    // pushable.getInput().fromStream(input); would also be acceptable to add input data
    pushable.getInput().add(100); 
    pushable.getInput().close();

    List list = pushable.getStream().collect(Collectors.toList());

     //list is [100]


文章来源: Java 8 Stream utilities for input data