How can I use and return Source queue to caller wi

2019-05-03 02:29发布

问题:

I'm trying to use new Akka streams and wonder how I can use and return Source queue to caller without materializing it in my code ?

Imagine we have library that makes number of async calls and returns results via Source. Function looks like this

def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {

  val source = Source.queue[String](100, backpressure)

  source.mapMaterializedValue { case queue =>

    val url = s"http://.....&term=$text"
    httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
      v.idlist.foreach { id =>
        queue.offer(id)
      }

      queue.complete()
    }
  }

  source
}

and caller might use it like this

// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)

When I run this code within mapMaterializedValue is never executed.

I can't understand why I don't have access to instance of SourceQueue if it should be up to caller to decide how to materialize the source.

How should I implement this ?

回答1:

In your code example you're returning source instead of the return value of source.mapMaterializedValue (the method call doesn't mutate the Source object).