I have properly working graph that has flow with loop. Items go through as expected and everything is working. But unfortunately with bounded source graph never ends. Ever. How can I fix it ?
Here is schema of my flow.
Here is simplified version of the flow with the same topology.
val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val mergeEntrance = builder.add(MergePreferred[Int](1))
val mergePreExit = builder.add(Merge[Int](2))
val part1 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 1 else 0 ))
val part2 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 0 else 1 ))
mergeEntrance.out ~> part1.in
part1.out(0) ~> mergePreExit.in(0)
part2.in <~ Flow[Int].collect{
case v if v == 1 =>
-1
} <~ part1.out(1)
mergeEntrance.preferred <~ part2.out(0)
part2.out(1) ~> mergePreExit.in(1)
FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})
val completionFut = Source(List(0, 1, 2, 3))
.buffer(4, OverflowStrategy.fail)
.via(badFlow)
.mapAsync(1) {
case v => Future.successful(v * 10)
}
.toMat(Sink.foreach(v => println(s">>> $v")))(Keep.right).run()
for (result <- completionFut)
println("Stream is over!")