Spark Streaming on Kafka print different cases for

2019-06-07 07:15发布


I am stating my scenario below: 10000 - Servers are sending DF size data. ( Every 5 seconds 10,000 inputs are coming )

If for any server DF size is more than 70 % print increase the ROM size by 20 % If for any server DF size used is less than 30 % print decrease the ROM size by 25 %.

I am providing a code that takes messages from kafka and matches with "%" and does to.upper(). This code is just for a reference to my kafka details.

Can anyone please help me with the scenario.

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}

object WordFind {
    def main(args: Array[String]) {
        import org.apache.spark.SparkConf
        val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
        import org.apache.spark.streaming.StreamingContext
        import org.apache.spark.streaming.Seconds
        val batchIntervalSeconds = 2

        val ssc = new StreamingContext(conf, Seconds(10))
        import org.apache.spark.streaming.kafka.KafkaUtils
        import org.apache.spark.streaming.dstream.ReceiverInputDStream

        val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
        import org.apache.spark.streaming.dstream.DStream

        val filteredStream: DStream[(String, String)] = kafkaStream.filter(record =>
                record._2.contains("%")) // TODO : pattern matching here
        val outputDStream: DStream[String] = => record._2.toUpperCase())


        ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)

Please help me with the scenario satisfying code.

Sample input

Filesystem 1K-blocks Used Available Use% Mounted on /dev/sda1 132239776 6210884 119311504 5% / tmpfs 4021876 0 4021876 0% /dev/shm

Sample output: if Use%>70 for any case> Message: Increase ROM size by 20% if Use%<30% for any case> Message: Decrease ROM size by 25%

even i have to put that to Elastic search and it is giving error:

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
  def main(args: Array[String]) {
  import org.apache.spark.SparkConf
  val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
  val sc = new SparkContext(conf)
  val checkpointDir = "/usr/local/kafka/kafka_2.11-"
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.Seconds
  val batchIntervalSeconds = 2
  val ssc = new StreamingContext(conf, Seconds(10))
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.streaming.dstream.ReceiverInputDStream
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
    "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

  import org.apache.spark.streaming.dstream.DStream

  val filteredStream: DStream[Array[String]] = kafkaStream
    .filter(!_._2.contains("Filesystem"))  // eliminate header
    .map(_._2.split("\\s+"))  // split with space
  val outputDStream: DStream[String] = {
    row =>
      val useIdx = row.length - 2
      // if Use%>70 for any case> Message: Increase ROM size by 20%
      // if Use%<30% for any case> Message: Decrease ROM size by 25%
      val usePercent = row(useIdx).replace("%", "").toInt
      usePercent match {
        case x if x > 70 => "Increase ROM size by 20%"
        case x if x < 30 => "Decrease ROM size by 25%"
        case _ => "Undefined"

  import org.elasticsearch.spark.sql._
// To make sure data is not deleted by the time we query it interactively
//    }
// This starts the streaming context in the background.
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)

error: Error:(51, 21) value saveToEs is not a member of org.apache.spark.streaming.dstream.DStream[String] outputDStream.saveToEs("kafkamessage_v1/kwc")


Have made few assumptions to get the required output.

1.) Headers may appear in between hence a filter is used to remove the header.

Filesystem 1K-blocks Used Available Use% Mounted on

2.) Since the Filesysytem column may have space inside the string, I have extracted the use% using the second index from the last. (if this doesn't work please try group regex to achieve the same)

3.) The case for use percentage between 30 and 70 is not defined, hence the output message contains "Undefined" for such cases.

A sample input output (using Array[String])

 scala> val input =
           |       """|Filesystem    512-blocks      Used Available Capacity iused      ifree %iused  Mounted on
           |          |/dev/disk1     234618880 154868528  79238352    67% 1784543 4293182736    0%   /
           |          |devfs                364       364         0   100%     630          0  100%   /dev
           |          |map -hosts             0         0         0   100%       0          0  100%   /net
           |          |map auto_home          0         0         0   100%       0          0  100%   /home""".stripMargin

 scala> val inputStr: Array[Array[String]] = input.split("\n").filter(!_.contains("Filesystem")).map(_.split("\\s+"))

 scala> val outputMessage = {
      |       row =>
      |         // Assuming the position is always second from last
      |         val elementPosition = row.length - 2 
      |         // if Use%>70 for any case> Message: Increase ROM size by 20%
      |         // if Use%<30% for any case> Message: Decrease ROM size by 25%
      |         val usePercent = row(elementPosition).replace("%", "").toInt
      |         usePercent match {
      |           case x if x > 70 => (usePercent, "Increase ROM size by 20%")
      |           case x if x < 30 => (usePercent, "Decrease ROM size by 25%")
      |           case _ => (usePercent, "Undefined")
      |         }
      |     }

 scala> outputMessage.foreach(println)
 (0,Decrease ROM size by 25%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)

This code works for a Array[String] please test it for ReceiverInputDStream[(String, String)]. The code must be similar to:

 val filteredStream: DStream[Array[String]] = kafkaStream
       .filter(!_._2.contains("Filesystem"))  // eliminate header
       .map(_._2.split("\\s+"))  // split with space
 val outputDStream: DStream[String] = {
       row =>
         val useIdx = row.length - 2
         // if Use%>70 for any case> Message: Increase ROM size by 20%
         // if Use%<30% for any case> Message: Decrease ROM size by 25%
         val usePercent = row(useIdx).replace("%", "").toInt
         usePercent match {
           case x if x > 70 => "Increase ROM size by 20%"
           case x if x < 30 => "Decrease ROM size by 25%"
           case _ => "Undefined"

Hope this helps.