How to use ReactiveMongo + JSON aggregation framew

2019-02-28 15:50发布

问题:

I need to utilize MongoDB's aggregation framework using ReactiveMongo. I only find this example which uses BSON. I would like to use JSON so I changed the code to :

def populatedStates(col: JSONCollection) = {
  import col.BatchCommands.AggregationFramework.{AggregationResult, Group, Match, SumField}
  val res: Future[AggregationResult] = col.aggregate(
    Group(JsString("$state"))("totalPop" -> SumField("population")),
    List(Match(JSONDocument("totalPop" -> JSONDocument("$gte" -> 10000000L)))))
  res.map(_.firstBatch)
}

But there's no "JSONDocument" type.

What would be the correct way to finish this approach?

回答1:

JSONDocument is the same as JsObject when you use JSONSerializationPack. JsObject converts to JSONDocument with implicit writer.

import play.api.libs.json._
import reactivemongo.play.json._
import reactivemongo.play.json.collection.JSONCollection

import scala.concurrent.Future

def populatedStates(col: JSONCollection) = {
  import col.BatchCommands.AggregationFramework.{AggregationResult, Group, Match, SumField}
  val res: Future[AggregationResult] = col.aggregate(
    Group(JsString("$state"))("totalPop" -> SumField("population")),
    List(Match(Json.obj("totalPop" -> Json.obj("$gte" -> 10000000L)))))
  res.map(_.firstBatch)
}


回答2:

For example you want execute example from https://docs.mongodb.com/v3.4/reference/operator/aggregation/subtract/#subtract-numbers with 'total' sorting Using play reacitve mongo 0.12.0 it should look as

// don't forget to import this !!
import reactivemongo.play.json.commands.JSONAggregationFramework._

def salesDb = reactiveMongoApi.database.map(_.collection[JSONcollection]("sales"))

val result = salesDb.flatMap { collection =>
  val projectOperator = Project(
    Json.obj(
        "item" -> 1,
        "total" -> Json.obj("$subtract" -> JsArray(Seq(Json.obj("$add" -> Seq("$price", "$fee")), JsString("$discount"))))
    )
  )
  val sortOperator = Sort(Ascending("total"))
  collection.aggregate(firstOperator = projectOperator, otherOperators = List(sortOperator)).map { agregationResult =>
    agregationResult.firstBatch
  }
}