I'm Collecting the data from a messaging app, i'm currently using Flume, it sends approx 50 Million records per day
I wish to use Kafka, consume from Kafka using Spark Streaming and persist it to hadoop and query with impala
I'm having issues with each approach I've tried..
Approach 1 - Save rdd as parquet, point an external hive parquet table to the parquet directory
// scala
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {
// 1 - Create a SchemaRDD object from the rdd and specify the schema
val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)
// 2 - register it as a spark sql table
SchemaRDD1.registerTempTable("sparktable")
// 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
val finalParquet = sqlContext.sql(sql)
finalParquet.saveAsParquetFile(dir)
The problem is that finalParquet.saveAsParquetFile outputs a huge no. of files, the Dstream received from Kafka outputs over 200 files for a 1 minute batch size. The reason that it outputs many files is because the computation is distributed as explained in another post- how to make saveAsTextFile NOT split output into multiple file? the propsed solutions dont seem optimal for me , for e.g. as one user states - Having a single output file is only a good idea if you have very little data.
Approach 2 - Use Hivecontext. insert rdd data directly to a hive table
# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)
def sendRecord(rdd):
sql = "INSERT INTO TABLE table select * from beacon_sparktable"
# 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
beaconDF = sqlContext.jsonRDD(rdd,schema)
# 2- Register the DataFrame as a spark sql table.
beaconDF.registerTempTable("beacon_sparktable")
# 3 - insert to hive directly from a qry on the spark sql table
sqlContext.sql(sql);
This works fine , it inserts directly to a parquet table but there are scheduling delays for the batches as processing time exceeds the batch interval time. The consumer cant keep up with whats being produced and the batches to process begin to queue up.
it seems writing to hive is slow. ive tried adjusting batch intervla size, running more consumer instances.
In summary
What is the best way to persist Big data from Spark Streaming given that there are issues with multiple files and potential latency with writing to hive? What are other people doing?
A similar question has been asked here, but he has an issue with directories as apposed to too many files How to make Spark Streaming write its output so that Impala can read it?
Many Thanks for any help