How to log flow rate in Akka Stream?

2019-04-02 02:18发布

问题:

I have an Akka Stream application with a single flow/graph. I want to measure the flow rate at the source and log it every 5 seconds, like 'received 3 messages in the last 5 seconds'. I tried with,

someOtherFlow
  .groupedWithin(Integer.MAX_VALUE, 5 seconds)
  .runForeach(seq => 
    log.debug(s"received ${seq.length} messages in the last 5 seconds")
  )

but it only outputs when there are messages, no empty list when there are 0 messages. I want the 0's as well. Is this possible?

回答1:

You could try something like

  src
    .conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
    .zip(Source.tick(5.seconds, 5.seconds, NotUsed))
    .map(_._1)

which should batch your elements until the tick releases them. This is inspired from an example in the docs.

On a different note, if you need this for monitoring purposes, you could leverage a 3rd party tool for this purpose - e.g. Kamon.



回答2:

Extending Stefano's answer a little I created the following flows:

def flowRate[T](metric: T => Int = (_: T) => 1, outputDelay: FiniteDuration = 1 second): Flow[T, Double, NotUsed] =
  Flow[T]
  .conflateWithSeed(metric(_)){ case (acc, x) ⇒ acc + metric(x) }
  .zip(Source.tick(outputDelay, outputDelay, NotUsed))
  .map(_._1.toDouble / outputDelay.toUnit(SECONDS))

def printFlowRate[T](name: String, metric: T => Int = (_: T) => 1,
                     outputDelay: FiniteDuration = 1 second): Flow[T, T, NotUsed] =
  Flow[T]
    .alsoTo(flowRate[T](metric, outputDelay)
              .to(Sink.foreach(r => log.info(s"Rate($name): $r"))))

The first converts the flow into a rate per second. You can supply a metric which gives a value to each object passing through. Say you want to measure the rate of characters in a flow of strings then you could pass _.length. The second parameter is the delay between flow rate reports (defaults to one second).

The second flow can be used inline to print the flow rate for debugging purposes without modifying the value passing through the stream. eg

stringFlow
  .via(printFlowRate[String]("Char rate", _.length, 10 seconds))
  .map(_.toLowercase) // still a string
  ...

which will show every 10 seconds the average the rate (per second) of characters.



回答3:

A sample akka stream logging.

  implicit val system: ActorSystem = ActorSystem("StreamLoggingActorSystem")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
  implicit val ec: ExecutionContextExecutor = system.dispatcher

  def randomInt = Random.nextInt()

  val source = Source.repeat(NotUsed).map(_ ⇒ randomInt)


  val logger = source
    .groupedWithin(Integer.MAX_VALUE, 5.seconds)
    .log(s"in the last 5 seconds number of messages received : ", _.size)
    .withAttributes(
      Attributes.logLevels(
        onElement = Logging.WarningLevel,
        onFinish = Logging.InfoLevel,
        onFailure = Logging.DebugLevel
      )
    )

  val sink = Sink.ignore

  val result: Future[Done] = logger.runWith(sink)

  result.onComplete{
    case Success(_) =>
      println("end of stream")
    case Failure(_) =>
      println("stream ended with failure")
  }

source code is here.