Split Akka Stream Source into two

2020-06-01 10:43发布

问题:

I have an Akka Streams Source which I want to split into two sources according to a predicate.

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

And two methods:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

I tried using Broadcast splitter but it requires Sinks at the end.

回答1:

Although you can choose which side of the Source you want to retrieve items from it's not possible to create a Source that that yields two outputs which is what it seems like you would ultimately want.

Given the GraphStage below which essentially splits the left and right values into two outputs...

/**
  * Fans out left and right values of an either
  * @tparam L left value type
  * @tparam R right value type
  */
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  import akka.stream.{Attributes, Outlet}
  import akka.stream.stage.GraphStageLogic

  override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    var out0demand = false
    var out1demand = false

    setHandler(shape.in, new InHandler {
      override def onPush(): Unit = {

        if (out0demand && out1demand) {
          grab(shape.in) match {
            case Left(l) =>
              out0demand = false
              push(shape.out0, l)
            case Right(r) =>
              out1demand = false
              push(shape.out1, r)
          }
        }
      }
    })

    setHandler(shape.out0, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out0demand) {
          out0demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })

    setHandler(shape.out1, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out1demand) {
          out1demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })
  }
}

.. you can route them to only receive one side:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> Sink.ignore

  SourceShape(eitherFanOut.out1)
})

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

... or probably more desirable, route them to two seperate Sinks:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> l.in
  eitherFanOut.out1 ~> r.in

  ClosedShape
})


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(Imports and initial setup)

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()

val values: List[Either[Throwable, String]] = List(
  Right("B"),
  Left(new Throwable),
  Left(new RuntimeException),
  Right("B"),
  Right("C"),
  Right("G"),
  Right("I"),
  Right("F"),
  Right("T"),
  Right("A")
)

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)


回答2:

This is implemented in akka-stream-contrib as PartitionWith. Add this dependency to SBT to pull it in to your project:

// latest version available on https://github.com/akka/akka-stream-contrib/releases libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"

PartitionWith is shaped like a Broadcast(2), but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a Sink or Flow to each of these outlets independently as appropriate. Building on cessationoftime's example, with the Broadcast replaced with a PartitionWith:

val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
                                  ((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val pw = b.add(
    PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
  )

  eitherSource ~> pw.in
  pw.out0 ~> leftSink
  pw.out1 ~> rightSink

  ClosedShape
})

val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)


回答3:

For this you can use a broadcast, then filter and map the streams within the GraphDSL:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))


val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

       import GraphDSL.Implicits._

       val broadcast = b.add(Broadcast[Either[Throwable,String]](2))


       s ~> broadcast.in
       broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in
       broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in


       ClosedShape
  })


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

I expect you will be able to run the functions you want from within the map.



回答4:

In the meantime this has been introduced to standard Akka-Streams: https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html.

You can split the input stream with a predicate and then use collect on each outputs to get only the types you are interested in.