Loop in flow makes stream never end

2019-09-09 13:34发布

问题:

I have properly working graph that has flow with loop. Items go through as expected and everything is working. But unfortunately with bounded source graph never ends. Ever. How can I fix it ?

Here is schema of my flow.

Here is simplified version of the flow with the same topology.

val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val mergeEntrance = builder.add(MergePreferred[Int](1))
  val mergePreExit = builder.add(Merge[Int](2))

  val part1 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 1 else 0 ))
  val part2 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 0 else 1 ))

  mergeEntrance.out ~> part1.in
  part1.out(0) ~> mergePreExit.in(0)
  part2.in <~ Flow[Int].collect{
    case v if v == 1 =>
      -1
  } <~ part1.out(1)

  mergeEntrance.preferred <~ part2.out(0)
  part2.out(1) ~> mergePreExit.in(1)

  FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})

val completionFut = Source(List(0, 1, 2, 3))
  .buffer(4, OverflowStrategy.fail)
  .via(badFlow)
  .mapAsync(1) {
    case v => Future.successful(v * 10)
  }
  .toMat(Sink.foreach(v => println(s">>> $v")))(Keep.right).run()

for (result <- completionFut)
  println("Stream is over!")