I am streaming data from Kafka using batch streaming (maxRatePerPartition 10.000). So in each batch I process 10.000 kafka messages.
Within this batch run I process each message by creating a dataFrame out of the rdd. After processing, I save each processed record to the same file using: dataFrame.write.mode(SaveMode.append). So it appends all messages to the same file.
This is ok as long as it is running within one batch run. But after the next batch run is executed (next 10.000 messages are processed) it creates a new file for the next 10.000 messages.
The problem is now: Each file (block) reserves 50mb of the file system but only contains around 1mb (10.000 messages). Instead of creating new files each batch run, I would prefer to have it all appended to one file as long as it is not exceeding the 50mb.
Do you know how to do this or why it is not workin in my example? You can have a look at my coding here:
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.Set
object SparkStreaming extends Constants {
def main(args: Array[String]) {
//create a new Spark configuration...
val conf = new SparkConf()
.setMaster("local[2]") // ...using 2 cores
.setAppName("Streaming")
.set("spark.streaming.kafka.maxRatePerPartition", "10000") //... processing max. 10000 messages per second
//create a streaming context for micro batch
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.)
//Setup up Kafka DStream
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667",
"auto.offset.reset" -> "smallest") //Start from the beginning
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET)
val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
kafkaParams, kafkaTopics)
val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source))
records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => {
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton
import sqlContext.implicits._
val dataFrame = rdd.toDF()
dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL)
println(s"Written entries: ${dataFrame.count()}")
}
)
//start streaming until the process is killed
ssc.start()
ssc.awaitTermination()
}
/** Case class for converting RDD to DataFrame */
case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
}
I would be happy to get your thoughts on that. Thanks, Alex