异步可迭代通过远程数据(Asynchronous Iterable over remote data

2019-10-23 08:00发布

还有,我已经从远程API,为此我使用了一个面向未来的界面风格拉一些数据。 该数据结构为链表。 一个相关的示例性数据容器如下所示。

case class Data(information: Int) {
    def hasNext: Boolean = ??? // Implemented
    def next: Future[Data] = ??? // Implemented
}

现在我感兴趣的是增加了一些功能,数据类,如mapforeachreduce等要做到这一点我想实现某种形式的IterableLike使得它inherets这些方法。 下面给出的特征Data可以扩展,使得它得到这个属性。

trait AsyncIterable[+T]
    extends IterableLike[Future[T], AsyncIterable[T]]
{
    def hasNext : Boolean
    def next : Future[T]

    // How to implement?
    override def iterator: Iterator[Future[T]] = ???
    override protected[this] def newBuilder: mutable.Builder[Future[T], AsyncIterable[T]] = ???
    override def seq: TraversableOnce[Future[T]] = ???
}

这应该是一个非阻塞的实现,这时候采取行动,开始请求来自远程数据源的下一个数据。 然后可以做很酷的东西,如

case class Data(information: Int) extends AsyncIterable[Data]
val data = Data(1) // And more, of course
// Asynchronously print all the information.
data.foreach(data => println(data.information))

另外,也可以为接口是不同的。 但结果应以某种方式代表了收集异步迭代。 优选地,在一个方式所熟悉的开发者,因为这将是一个(开源)库的一部分。

Answer 1:

在生产中我会使用下列操作之一:

  1. 阿卡流
  2. 无扩展

对于私人测试中,我将实现类似于下面的东西。 (说明如下)

我已经修改了一点点你的Data

abstract class AsyncIterator[T] extends Iterator[Future[T]] {
  def hasNext: Boolean
  def next(): Future[T]
}

对于它,我们可以实现此Iterable

class AsyncIterable[T](sourceIterator: AsyncIterator[T])
  extends IterableLike[Future[T], AsyncIterable[T]]
{
  private def stream(): Stream[Future[T]] =
    if(sourceIterator.hasNext) {sourceIterator.next #:: stream()} else {Stream.empty}
  val asStream = stream()

  override def iterator = asStream.iterator
  override def seq = asStream.seq
  override protected[this] def newBuilder = throw new UnsupportedOperationException()
}

如果使用下面的代码看到它在行动:

object Example extends App {
  val source = "Hello World!";

  val iterator1 = new DelayedIterator[Char](100L, source.toCharArray)
  new AsyncIterable(iterator1).foreach(_.foreach(print)) //prints 1 char per 100 ms
  pause(2000L)

  val iterator2 = new DelayedIterator[String](100L, source.toCharArray.map(_.toString))
  new AsyncIterable(iterator2).reduceLeft((fl: Future[String], fr) =>
    for(l <- fl; r <- fr) yield {println(s"$l+$r"); l + r}) //prints 1 line per 100 ms
  pause(2000L)

  def pause(duration: Long) = {println("->"); Thread.sleep(duration); println("\n<-")}
}

class DelayedIterator[T](delay: Long, data: Seq[T]) extends AsyncIterator[T] {
  private val dataIterator = data.iterator
  private var nextTime = System.currentTimeMillis() + delay
  override def hasNext = dataIterator.hasNext
  override def next = {
    val thisTime = math.max(System.currentTimeMillis(), nextTime)
    val thisValue = dataIterator.next()
    nextTime = thisTime + delay
    Future {
      val now = System.currentTimeMillis()
      if(thisTime > now) Thread.sleep(thisTime - now) //Your implementation will be better
      thisValue
    }
  }
}

说明

AsyncIterable使用流,因为它懒洋洋地计算,这很简单。

优点:

  • 简单
  • 多次调用iteratorseq方法返回相同的可迭代的所有项目。

缺点:

  • 因为流保持所有prevously获得的值可能会导致内存溢出。
  • 第一值创建的过程中得到了热切AsyncIterable

DelayedIterator是非常简单的实现AsyncIterator的,不要怪我了快速和肮脏的代码在这里。

它仍然是陌生的,我看到同步hasNext和异步next()



Answer 2:

使用Twitter后台,我实现工作的例子。 为了实现spool我修改了在本例中的文档 。

import com.twitter.concurrent.Spool
import com.twitter.util.{Await, Return, Promise}

import scala.concurrent.{ExecutionContext, Future}

trait AsyncIterable[+T <: AsyncIterable[T]] { self : T =>
    def hasNext : Boolean
    def next : Future[T]

    def spool(implicit ec: ExecutionContext) : Spool[T] = {
        def fill(currentPage: Future[T], rest: Promise[Spool[T]]) {
            currentPage foreach { cPage =>
                if(hasNext) {
                    val nextSpool = new Promise[Spool[T]]
                    rest() = Return(cPage *:: nextSpool)
                    fill(next, nextSpool)
                } else {
                    val emptySpool = new Promise[Spool[T]]
                    emptySpool() = Return(Spool.empty[T])
                    rest() = Return(cPage *:: emptySpool)
                }
            }
        }
        val rest = new Promise[Spool[T]]
        if(hasNext) {
            fill(next, rest)
        } else {
            rest() = Return(Spool.empty[T])
        }
        self *:: rest
    }
}

数据是和以前一样,现在我们可以使用它。

// Cool stuff
implicit val ec = scala.concurrent.ExecutionContext.global
val data = Data(1) // And others
// Print all the information asynchronously
val fut = data.spool.foreach(data => println(data.information))
Await.ready(fut)

它将特罗在第二元件上的例外,因为执行next未提供。



文章来源: Asynchronous Iterable over remote data