-->

using akka streams to go over mongo collection

2019-08-17 08:37发布

问题:

I have a collection of people in mongo, and I want to go over each person in the collection as a stream, and for each person call a method that is performing api call, changing the model, and inserting to a new collection in mongo.

It looks like this:

  def processPeople()(implicit m: Materializer): Future[Unit] = {

    val peopleSource: Source[Person, Future[State]] = collection.find(json()).cursor[Person]().documentSource()

    peopleSource.runWith(Sink.seq[Person]).map(people => {
      people.foreach(person => {
        changeModelAndInsertToNewCollection(person)
      }) 
    })
  }

but this is not working...the part of changing the model seems like is working, but the insert to mongo is not working.

It looks like also the method is not starting right away, there some processing going behind before for a min before it starts....do you see the issue?

回答1:

Solution 1 :

def changeModelAndInsertToNewCollection(person:Person) : Future[Boolean] ={
//Todo : call mongo api to update the person
???
}

def processPeople()(implicit m: Materializer): Future[Done] = {
val numberOfConcurrentUpdate = 10

val peopleSource: Source[Person, Future[State]] =
  collection
    .find(json())
    .cursor[Person]()
    .documentSource()

peopleSource
  .mapAsync(numberOfConcurrentUpdate)(changeModelAndInsertToNewCollection)
  withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
  .runWith(Sink.ignore)}

Solution 2 : using Alpakka as akka stream connector for mongo

val source: Source[Document, NotUsed] =
MongoSource(collection.find(json()).cursor[Person]().documentSource())

source.runWith(MongoSink.updateOne(2, collection))