Persisting Spark Streaming output

2019-03-13 17:29发布

问题:

I'm Collecting the data from a messaging app, i'm currently using Flume, it sends approx 50 Million records per day

I wish to use Kafka, consume from Kafka using Spark Streaming and persist it to hadoop and query with impala

I'm having issues with each approach I've tried..

Approach 1 - Save rdd as parquet, point an external hive parquet table to the parquet directory

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

The problem is that finalParquet.saveAsParquetFile outputs a huge no. of files, the Dstream received from Kafka outputs over 200 files for a 1 minute batch size. The reason that it outputs many files is because the computation is distributed as explained in another post- how to make saveAsTextFile NOT split output into multiple file? the propsed solutions dont seem optimal for me , for e.g. as one user states - Having a single output file is only a good idea if you have very little data.

Approach 2 - Use Hivecontext. insert rdd data directly to a hive table

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

This works fine , it inserts directly to a parquet table but there are scheduling delays for the batches as processing time exceeds the batch interval time. The consumer cant keep up with whats being produced and the batches to process begin to queue up.

it seems writing to hive is slow. ive tried adjusting batch intervla size, running more consumer instances.

In summary

What is the best way to persist Big data from Spark Streaming given that there are issues with multiple files and potential latency with writing to hive? What are other people doing?

A similar question has been asked here, but he has an issue with directories as apposed to too many files How to make Spark Streaming write its output so that Impala can read it?

Many Thanks for any help

回答1:

In solution #2, the number of files created can be controlled via the number of partitions of each RDD.

See this example:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

Now, I guess you can play with the frequency at which you pull data from Kafka, and the number of partitions of each RDD (default, the partitions of your Kafka topic, that you can possibly reduce by repartitioning).

I'm using Spark 1.5 from CDH 5.5.1, and I get the same result using either df.write.mode("append").saveAsTable("test") or your SQL string.