Spark to process rdd chunk by chunk from json file

2019-09-14 19:32发布

问题:

I am new to Spark & scala. I have a requirement to process number of json files say from s3 location. These data is basically batch data which would be kept for reproccessing sometime later. Now my spark job should process these files in such a way that it should pick 5 raw json records and should send a message to Kafka topic. The reason of picking only 5 records is kafka topic is processing both real time and batch data simultaneously on the same topic. so the batch processing should not delay the real time processing.

I need to process the whole json file sequentially and so I would pick only 5 records at a time and post a message to kafka and pick next 5 records of json file and so on...

I have written a piece of code which would read from json files and post it to kafka topic.

        val jsonRDD = sc.textFile(s3Location)

        var count = 0

        val buf = new StringBuilder

        jsonRDD.collect().foreach(line => {
            count += 1
                    buf ++= line
                    if (count > 5) {
                        println(s"Printing 5 jsons $buf")
                        count = 0
                        buf.setLength(0)
                        SendMessagetoKakfaTopic(buf) // psuedo cod for sending message to kafkatopic 
                        Thread.sleep(10000)
                    }
        })
        if (buf != null) {
            println(s"Printing remaining jsons $buf")
            SendMessagetoKakfaTopic(buf)
        }

I believe there is a more efficient way of processing JSONs in Spark.

And also I should also be looking for any other parameters like memory, resources etc. Since the data might go beyond 100's of gig.

回答1:

That looks like a case for Spark Streaming or (recommended) Spark Structured Streaming.

In either case you monitor a directory and process new files every batch interval (configurable).


You could handle it using SparkContext.textFile (with wildcards) or SparkContext.wholeTextFiles. In either case, you'll eventually end up with RDD[String] to represent the rows in your JSON files (one line per JSON file).

If your requirement is to process the files sequentially, 5-line chunk by 5-line chunk, you could make the transformation pipeline slightly more efficient by using RDD.toLocalIterator:

toLocalIterator: Iterator[T]

Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.

See RDD API.

With Iterator of JSONs, you'd do sliding with 5 elements.

That would give you pretty efficient pipeline.


I once again strongly recommend reading up on Structured Streaming in Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) (it's about reading but writing is also supported).