I am stating my scenario below: 10000 - Servers are sending DF size data. ( Every 5 seconds 10,000 inputs are coming )
If for any server DF size is more than 70 % print increase the ROM size by 20 % If for any server DF size used is less than 30 % print decrease the ROM size by 25 %.
I am providing a code that takes messages from kafka and matches with "%" and does to.upper(). This code is just for a reference to my kafka details.
Can anyone please help me with the scenario.
package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
def main(args: Array[String]) {
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[(String, String)] = kafkaStream.filter(record =>
record._2.contains("%")) // TODO : pattern matching here
val outputDStream: DStream[String] = filteredStream.map(record => record._2.toUpperCase())
outputDStream.print()
ssc.start
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
}
Please help me with the scenario satisfying code.
Sample input
Filesystem 1K-blocks Used Available Use% Mounted on /dev/sda1 132239776 6210884 119311504 5% / tmpfs 4021876 0 4021876 0% /dev/shm
Sample output: if Use%>70 for any case> Message: Increase ROM size by 20% if Use%<30% for any case> Message: Decrease ROM size by 25%
even i have to put that to Elastic search and it is giving error:
package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
def main(args: Array[String]) {
}
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val sc = new SparkContext(conf)
val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
"spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[Array[String]] = kafkaStream
.filter(!_._2.contains("Filesystem")) // eliminate header
.map(_._2.split("\\s+")) // split with space
val outputDStream: DStream[String] = filteredStream.map {
row =>
val useIdx = row.length - 2
// if Use%>70 for any case> Message: Increase ROM size by 20%
// if Use%<30% for any case> Message: Decrease ROM size by 25%
val usePercent = row(useIdx).replace("%", "").toInt
usePercent match {
case x if x > 70 => "Increase ROM size by 20%"
case x if x < 30 => "Decrease ROM size by 25%"
case _ => "Undefined"
}
outputDStream.print()
import org.elasticsearch.spark.sql._
outputDStream.saveToEs("dfvalueoperations_v1/kwc")
}
// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
// }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
error: Error:(51, 21) value saveToEs is not a member of org.apache.spark.streaming.dstream.DStream[String] outputDStream.saveToEs("kafkamessage_v1/kwc")