Modify via to accept method returning a Future

2019-08-27 03:15发布

问题:

I am calling a method using via like below:

myRawStr(id)
.take(1)
.via(myMethod("someString", someSource)
.zip(Source.fromIterator(() => Iterator.from(1)))
.collect {
...
}

myMethod returns type Flow[ByteString, MyValidated[MyClass], NotUsed] but now it will be returning Future[Flow[ByteString, MyValidated[MyClass], NotUsed]] (Note: Future)

but doing this gives me compilation error on via. The error states:

[error]  found   : [as, mat, ec]scala.concurrent.Future[akka.stream.scaladsl.Flow[akka.util.ByteString,MyValidated[MyClass],akka.NotUsed]]
[error]     (which expands to)  [as, mat, ec]scala.concurrent.Future[akka.stream.scaladsl.Flow[akka.util.ByteString,scala.util.Either[List[ValidationError],MyClass],akka.NotUsed]]
[error]  required: akka.stream.Graph[akka.stream.FlowShape[akka.util.ByteString,?],?]
[error]       .via(myMethod("someString", someSource))
[error]

How can I modify this to accept a Future OR based on this question return a non future by adding another step in the flow?

回答1:

via accepts a Flow as argument. You either have to change MyMethod to return a Flow instead of a Future. Or instead of using via you can use mapAsyc that maps your flow using a method that returns a Future (MyMethod):

https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/mapAsync.html