Is there an idiomatic way of handling Akka stream's Source
first element in a special way? What I have now is:
var firstHandled = false
source.map { elem =>
if(!firstHandled) {
//handle specially
firstHandled = true
} else {
//handle normally
}
}
Thanks
While I would generally go with Ramon's answer, you could also use prefixAndTail
, with a prefix of 1, together with flatMapConcat
to achieve something similar:
val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest: $i")
val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
// `head` is a Seq of the prefix elements, which in our case is
// just the first one. We can convert it to a source of just
// the first element, processed via our fst flow, and then
// concatenate `tail`, which is the remainder...
Source(head).via(fst).concat(tail.via(rst))
}
Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest: 2
// Rest: 3
// Rest: 4
// Rest: 5
This of course works not just for the first item, but for the first N items, with the proviso that those items will be taken up as a strict collection.
Using zipWith
You could zip the original Source
with a Source of Booleans that only returns true
the first time. This zipped Source can then be processed.
First we'll need a Source that emits the Booleans:
//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] =
(Iterator single true) ++ (Iterator continually false)
def firstTrueSource : Source[Boolean, _] =
Source fromIterator firstTrueIterator
We can then define a function that handles the two different cases:
type Data = ???
type OutputData = ???
def processData(data : Data, firstRun : Boolean) : OutputData =
if(firstRun) { ... }
else { ... }
This function can then be used in a zipWith
of your original Source:
val originalSource : Source[Data,_] = ???
val contingentSource : Source[OutputData,_] =
originalSource.zipWith(firstTrueSource)(processData)
Using Stateful Flow
You could create a Flow
that contains state similar to the example in the question but with a more functional approach:
def firstRunner(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : (Data) => OutputData = {
var firstRun = true
(data : Data) => {
if(firstRun) {
firstRun = false
firstCall(data)
}
else
otherCalls(data)
}
}//end def firstRunner
def firstRunFlow(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] =
Flow[Data] map firstRunner(firstCall, otherCalls)
This Flow can then be applied to your original Source:
def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???
val firstSource : Source[OutputData, _] =
originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)
"Idiomatic Way"
Answering your question directly requires dictating the "idiomatic way". I answer that part last because it is the least verifiable by the compiler and is therefore closer to opinion. I would never claim to be a valid classifier of idiomatic code.
My personal experience with akka-streams has been that it is best to switch my perspective to imagining an actual stream (I think of a train with boxcars) of Data
elements. Do I need to break it up into multiple fixed size trains? Do only certain boxcars make it through? Can I attach another train side-by-side that contains Boolean
cars which can signal the front? I would prefer the zipWith method due to my regard of streams (trains). My initial approach is always to use other stream parts connected together.
Also, I find it best to embed as little code in an akka Stream component as possible. firstTrueIterator
and processData
have no dependency on akka at all. Concurrently, the firstTrueSource
and contingentSource
definitions have virtually no logic. This allows you to test the logic independent of a clunky ActorSystem and the guts can be used in Futures, or Actors.
While I prefer the approach with zip, one can also use statefulMapConcat
:
source
.statefulMapConcat { _ =>
val firstRun = true
elem => {
if (firstRun) {
//first
firstRun = false
} else {
//not first
}
}
}
You can use prepend
to prepend a source to flows. Just prepend single item source to the flow, after it is drained, rest of the original source will continue.
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html
Source(List(1, 2, 3))
.prepend(Source.single(0))
.runWith(Sink.foreach(println))
0
1
2
3