Is there a FIFO stream in Scala?

2019-02-06 04:47发布

问题:

I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of

  • immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
  • mutable.Queue (which allows for added elements to the FIFO)

The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.

Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.

My questions:

  • 1) Did I overlook something? Is there already a class providing this functionality?

  • 2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?

    class FifoStream[T] extends Closeable {
    
    val queue = new Queue[Option[T]]
    
    lazy val stream = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() match {
        case Some(elem) => Stream.cons(elem, nextStreamElem)
        case None       => Stream.empty
    }
    
    /** Returns next element in the queue (may wait for it to be inserted). */
    private def next() = {
        queue.synchronized {
            if (queue.isEmpty) queue.wait()
            queue.dequeue()
        }
    }
    
    /** Adds new elements to this stream. */
    def enqueue(elems: T*) {
        queue.synchronized {
            queue.enqueue(elems.map{Some(_)}: _*)
            queue.notify()
        }
    }
    
    /** Closes this stream. */
    def close() {
        queue.synchronized {
            queue.enqueue(None)
            queue.notify()
        }
    }
    }
    

Paradigmatic's solution (sightly modified)

Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

回答1:

In Scala, streams are "functional iterators". People expect them to be pure (no side effects) and immutable. In you case, everytime you iterate on the stream you modify the queue (so it's no pure). This can create a lot of misunderstandings, because iterating twice the same stream, will have two different results.

That being said, you should rather use Java BlockingQueues, rather than rolling your own implementation. They are considered well implemented in term of safety and performances. Here is the cleanest code I can think of (using your approach):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}


回答2:

I'm assuming you're looking for something like java.util.concurrent.BlockingQueue?

Akka has a BoundedBlockingQueue implementation of this interface. There are of course the implementations available in java.util.concurrent.

You might also consider using Akka's actors for whatever it is you are doing. Use Actors to be notified or pushed a new event or message instead of pulling.



回答3:

1) It seems you're looking for a dataflow stream seen in languages like Oz, which supports the producer-consumer pattern. Such a collection is not available in the collections API, but you could always create one yourself.

2) The data flow stream relies on the concept of single-assignment variables (such that they don't have to be initialized upon declaration point and reading them prior to initialization causes blocking):

val x: Int
startThread {
  println(x)
}
println("The other thread waits for the x to be assigned")
x = 1

It would be straightforward to implement such a stream if single-assignment (or dataflow) variables were supported in the language (see the link). Since they are not a part of Scala, you have to use the wait-synchronized-notify pattern just like you did.

Concurrent queues from Java can be used to achieve that as well, as the other user suggested.