Handle Akka stream's first element specially

2019-04-06 21:47发布

问题:

Is there an idiomatic way of handling Akka stream's Source first element in a special way? What I have now is:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }

Thanks

回答1:

While I would generally go with Ramon's answer, you could also use prefixAndTail, with a prefix of 1, together with flatMapConcat to achieve something similar:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5

This of course works not just for the first item, but for the first N items, with the proviso that those items will be taken up as a strict collection.



回答2:

Using zipWith

You could zip the original Source with a Source of Booleans that only returns true the first time. This zipped Source can then be processed.

First we'll need a Source that emits the Booleans:

//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] = 
  (Iterator single true) ++ (Iterator continually false)

def firstTrueSource : Source[Boolean, _] = 
  Source fromIterator firstTrueIterator

We can then define a function that handles the two different cases:

type Data = ???
type OutputData = ???

def processData(data : Data, firstRun : Boolean) : OutputData = 
  if(firstRun) { ... }
  else { ... }

This function can then be used in a zipWith of your original Source:

val originalSource : Source[Data,_] = ???    

val contingentSource : Source[OutputData,_] =
  originalSource.zipWith(firstTrueSource)(processData)

Using Stateful Flow

You could create a Flow that contains state similar to the example in the question but with a more functional approach:

def firstRunner(firstCall : (Data) => OutputData,
                otherCalls : (Data) => OutputData) : (Data) => OutputData = {
  var firstRun = true
  (data : Data) => {
    if(firstRun) {
      firstRun = false
      firstCall(data)
    }
    else
      otherCalls(data)
  }
}//end def firstRunner

def firstRunFlow(firstCall :  (Data) => OutputData, 
                 otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] = 
  Flow[Data] map firstRunner(firstCall, otherCalls)

This Flow can then be applied to your original Source:

def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???

val firstSource : Source[OutputData, _] = 
  originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)

"Idiomatic Way"

Answering your question directly requires dictating the "idiomatic way". I answer that part last because it is the least verifiable by the compiler and is therefore closer to opinion. I would never claim to be a valid classifier of idiomatic code.

My personal experience with akka-streams has been that it is best to switch my perspective to imagining an actual stream (I think of a train with boxcars) of Data elements. Do I need to break it up into multiple fixed size trains? Do only certain boxcars make it through? Can I attach another train side-by-side that contains Boolean cars which can signal the front? I would prefer the zipWith method due to my regard of streams (trains). My initial approach is always to use other stream parts connected together.

Also, I find it best to embed as little code in an akka Stream component as possible. firstTrueIterator and processData have no dependency on akka at all. Concurrently, the firstTrueSource and contingentSource definitions have virtually no logic. This allows you to test the logic independent of a clunky ActorSystem and the guts can be used in Futures, or Actors.



回答3:

While I prefer the approach with zip, one can also use statefulMapConcat:

source
  .statefulMapConcat { _ =>
        val firstRun = true
        elem => {
          if (firstRun) {
            //first
            firstRun = false
          } else {
            //not first            
          }
        }
      }


回答4:

You can use prepend to prepend a source to flows. Just prepend single item source to the flow, after it is drained, rest of the original source will continue.

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html

 Source(List(1, 2, 3))
  .prepend(Source.single(0))
  .runWith(Sink.foreach(println))

0 1 2 3