I'm using Play Framework with Scala, Akka and ReactiveMongo. I want to use a collection in MongoDB as a circular queue. Several actors can insert documents into it; one actor retrieves these documents as soon as they're available (a sort of publish-subscribe system). I'm using capped collections and tailable cursor. Everytime I retrieve some documents I have to run the command EmptyCapped to flush the capped collection (it's not possible to REMOVE elements from it) otherwise I retrieve always the same document. is there an alternative solution? for example is there a way to slide a cursor without removing elements? or it's better not to use capped collection in my case?
object MexDB {
def db: reactivemongo.api.DB = ReactiveMongoPlugin.db
val size: Int = 10000
// creating capped collection
val collection: JSONCollection = {
val c = db.collection[JSONCollection]("messages")
val isCapped = coll.convertToCapped(size, None)
Await.ready(isCapped, Duration.Inf)
c
}
def insert(mex: Mex) = {
val inserted = collection.insert(mex)
inserted onComplete {
case Failure(e) =>
Logger.info("Error while inserting task: " + e.getMessage())
throw e
case Success(i) =>
Logger.info("Successfully inserted task")
}
}
def find(): Enumerator[Mex] = {
val cursor: Cursor[Mex] = collection
.find(Json.obj())
.options(QueryOpts().tailable.awaitData)
.cursor[Mex]
// meaning of maxDocs ???
val maxDocs = 1
cursor.enumerate(maxDocs)
}
def removeAll() = {
db.command(new EmptyCapped("messages"))
}
}
/*** part of receiver actor code ***/
// inside preStart
val it = Iteratee.fold[Mex, List[Mex]](Nil) {
(partialList, mex) => partialList ::: List(mex)
}
// Inside "receive" method
case Data =>
val e: Enumerator[Mex] = MexDB.find()
val future = e.run(it)
future onComplete {
case Success(list) =>
list foreach { mex =>
Logger.info("Mex: " + mex.id)
}
MexDB.removeAll()
self ! Data
case Failure(e) => Logger.info("Error: "+ e.getMessage())
}
Your tailable cursor is closed after each found doc as the
maxDocs = 1
. To keep it open indefinitely you should omit this limit.With
awaitData
,.onComplete
will only be called if you explicitly shutdown RM.You need to use some streaming function from the cursor, such as
.enumerate
and process each new step/result. See https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/