I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of
- immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
- mutable.Queue (which allows for added elements to the FIFO)
The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.
Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.
My questions:
1) Did I overlook something? Is there already a class providing this functionality?
2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?
class FifoStream[T] extends Closeable { val queue = new Queue[Option[T]] lazy val stream = nextStreamElem private def nextStreamElem: Stream[T] = next() match { case Some(elem) => Stream.cons(elem, nextStreamElem) case None => Stream.empty } /** Returns next element in the queue (may wait for it to be inserted). */ private def next() = { queue.synchronized { if (queue.isEmpty) queue.wait() queue.dequeue() } } /** Adds new elements to this stream. */ def enqueue(elems: T*) { queue.synchronized { queue.enqueue(elems.map{Some(_)}: _*) queue.notify() } } /** Closes this stream. */ def close() { queue.synchronized { queue.enqueue(None) queue.notify() } } }
Paradigmatic's solution (sightly modified)
Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}