spark streaming update_state_by_keys for arrays ag

2020-04-10 01:59发布

问题:

I have input lines like below

t1, file1, 1, 1, 1

t1, file1, 1, 2, 3

t1, file2, 2, 2, 2, 2

t2, file1, 5, 5, 5

t2, file2, 1, 1, 2, 2

and the output like below rows which is a vertical addition of the corresponding numbers.

file1 : [ 1+, 1+2+5, 1+3+5 ]

file2 : [ 2+1, 2+1, 2+2, 2+2 ]

Currently data aggregation logic is working for batch interval, but it's not maintaining state. So, i am adding update_state_by_key function and passing below function, Is this right way to do?

My current program:

    def updateValues( newValues: Seq[Array[Int]], currentValue: Option[Array[Int]]) = {

        val previousCount = currentValue.getOrElse(Array.fill[Byte](newValues.length)(0))
        val allValues = newValues +: previousCount
        Some(allValues.toList.transpose.map(_.sum).toArray)

      }

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    // create a StreamingContext, the main entry point for all streaming functionality
    val ssc = new StreamingContext(sc, Seconds(2))
    // parse the lines of data into coverage objects
    val inputStream = ssc.socketTextStream(<hostname>, 9999)
    ssc.checkpoint("<hostname>:8020/user/spark/checkpoints_dir")
    inputStream.print(10)
    val parsedDstream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
      })
    val aggregated_file_counts = parsedDstream.updateStateByKey(updateValues)

        // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }

For reference, my previous program (without stateful transformation):

def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("HBaseStream")
        val sc = new SparkContext(conf)
        // create a StreamingContext, the main entry point for all streaming functionality
        val ssc = new StreamingContext(sc, Seconds(2))
        val inputStream = ssc.socketTextStream("hostname", 9999)
        val parsedDstream = inputStream
          .map(line => {
            val splitLines = line.split(",")
            (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
          })
          .reduceByKey((first, second) => {
            val listOfArrays = ArrayBuffer(first, second)
            listOfArrays.toList.transpose.map(_.sum).toArray
          })
          .foreachRDD(rdd => rdd.foreach(Blaher.blah))
    }

Thanks in advance.

回答1:

What you're looking for is updateStateByKey. For DStream[(T, U)] it should take a function with two arguments:

  • Seq[U] - representing state for current window
  • Option[U] - representing accumulated state

and return Option[U].

Given your code it could be implemented for example like this:

import breeze.linalg.{DenseVector => BDV}
import scala.util.Try

val state: DStream[(String, Array[Int])] = parsedStream.updateStateByKey(
  (current: Seq[Array[Int]], prev: Option[Array[Int]]) =>  {
    prev.map(_ +: current).orElse(Some(current))
    .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

To be able to use it you'll have to configure checkpointing.