I am new to Spark & scala. I have a requirement to process number of json files say from s3 location. These data is basically batch data which would be kept for reproccessing sometime later. Now my spark job should process these files in such a way that it should pick 5 raw json records and should send a message to Kafka topic. The reason of picking only 5 records is kafka topic is processing both real time and batch data simultaneously on the same topic. so the batch processing should not delay the real time processing.
I need to process the whole json file sequentially and so I would pick only 5 records at a time and post a message to kafka and pick next 5 records of json file and so on...
I have written a piece of code which would read from json files and post it to kafka topic.
val jsonRDD = sc.textFile(s3Location)
var count = 0
val buf = new StringBuilder
jsonRDD.collect().foreach(line => {
count += 1
buf ++= line
if (count > 5) {
println(s"Printing 5 jsons $buf")
count = 0
buf.setLength(0)
SendMessagetoKakfaTopic(buf) // psuedo cod for sending message to kafkatopic
Thread.sleep(10000)
}
})
if (buf != null) {
println(s"Printing remaining jsons $buf")
SendMessagetoKakfaTopic(buf)
}
I believe there is a more efficient way of processing JSONs in Spark.
And also I should also be looking for any other parameters like memory, resources etc. Since the data might go beyond 100's of gig.