My goal is to extract data from Kafka using Spark Streaming, transform the data, and store it into a bucket S3 as Parquet files, and using folders based on the date (Partitioned data to faster queries in Athena). My main problem is that the number of actives batches increase during the process and I just want to have only one active batch. I have a delay problem, I tried different configurations, and cluster sizes, to solve each bath in less time that the total duration bath. For example, if I have a batch of 5 minutes, I want to resolve that batch in less than 5 minutes and have that behavior for all the batches during the time. In other words, I want:
1) To solve each batch faster than total time.
2) To keep the behavior of the previous point for all the batches during the time and without errors.
3) To use few resources at possible for expending less money.
What I have?
In Kafka, we have almost 200.000 messages per second, and each message has around 45 fields in Protobuffer, and I convert each field into a string. In terms of output, I generate around 1 terabyte of Parquet files per hour in Amazon S3. Our Kafka topic has 60 partitions.
What I tried?
- Different batches interval. (1 second, 10 seconds, 1 minute, 5 minutes, 10 minutes, and so on)
- Increase the cluster, vertically and horizontally. (I'm using Amazon EMR)
- Different spark submit configurations (Max Rate per partition, backpressure, number of executors, and so on)
Finally, can someone recommend me a cluster configuration for solving that problem? In terms of the number of nodes, and kind of machines. Also, the spark-submit configuration according to that cluster. Also, I don't know if I can do an optimization in the Scala code (I pasted the code below), or how to find the reason of this problem.
Additionall information:
This is one of the commands that I tried:
spark-submit --deploy-mode cluster --master yarn
--conf spark.dynamicAllocation.enabled=false
--num-executors 30
--executor-cores 2
--executor-memory 3G
--conf "spark.sql.parquet.mergeSchema=false"
--conf "spark.debug.maxToStringFields=100"
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
--class "RequestsDstream" RequestsDataLake-assembly-0.1.jar
This is the code:
import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date
import java.util.Calendar
import com.tap.proto.ProtoMessages
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, date_format}
object SQLContextSingleton {
@transient private var instance: SQLContext = null
def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
object RequestsDstream {
def message_proto(value:Array[Byte]): Map[String, String] = {
try {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val requests_proto = ProtoMessages.SspRequest.parseFrom(value)
val json = Map(
"version" -> (requests_proto.getVersion().toString),
"adunit" -> (requests_proto.getAdunit().toString),
"adunit_original" -> (requests_proto.getAdunitOriginal().toString),
"brand" -> (requests_proto.getBrand().toString),
"country" -> (requests_proto.getCountry().toString),
"device_connection_type" -> (requests_proto.getDeviceConnectionType().toString),
"device_density" -> (requests_proto.getDeviceDensity().toString),
"device_height" -> (requests_proto.getDeviceHeight().toString),
"device_id" -> (requests_proto.getDeviceId().toString),
"device_type" -> (requests_proto.getDeviceType().toString),
"device_width" -> (requests_proto.getDeviceWidth().toString),
"domain" -> (requests_proto.getDomain().toString),
"endpoint" -> (requests_proto.getEndpoint().toString),
"endpoint_type" -> (requests_proto.getEndpointType().toString),
"endpoint_version" -> (requests_proto.getEndpointVersion().toString),
"external_dfp_id" -> (requests_proto.getExternalDfpId().toString),
"id_req" -> (requests_proto.getIdReq().toString),
"ip" -> (requests_proto.getIp().toString),
"lang" -> (requests_proto.getLang().toString),
"lat" -> (requests_proto.getLat().toString),
"lon" -> (requests_proto.getLon().toString),
"model" -> (requests_proto.getModel().toString),
"ncc" -> (requests_proto.getNcc().toString),
"noc" -> (requests_proto.getNoc().toString),
"non" -> (requests_proto.getNon().toString),
"os" -> (requests_proto.getOs().toString),
"osv" -> (requests_proto.getOsv().toString),
"scc" -> (requests_proto.getScc().toString),
"sim_operator_code" -> (requests_proto.getSimOperatorCode().toString),
"size" -> (requests_proto.getSize().toString),
"soc" -> (requests_proto.getSoc().toString),
"son" -> (requests_proto.getSon().toString),
"source" -> (requests_proto.getSource().toString),
"ts" -> (dateFormat.format(new Date(requests_proto.getTs()))).toString,
"user_agent" -> (requests_proto.getUserAgent().toString),
"status" -> (requests_proto.getStatus().toString),
"delivery_network" -> (requests_proto.getDeliveryNetwork().toString),
"delivery_time" -> (requests_proto.getDeliveryTime().toString),
"delivery_status" -> (requests_proto.getDeliveryStatus().toString),
"delivery_network_key" -> (requests_proto.getDeliveryNetworkKey().toString),
"delivery_price" -> (requests_proto.getDeliveryPrice().toString),
"device_id_original" -> (requests_proto.getDeviceIdOriginal().toString),
"tracking_limited" -> (requests_proto.getTrackingLimited().toString),
"from_cache" -> (requests_proto.getFromCache().toString)
)
return json
}catch{
case e:Exception=>
print(e)
return Map("error" -> "error")
}
}
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Requests DStream EMR 10 minutes")
val ssc = new StreamingContext(conf, Seconds(60*10))
val sc = ssc.sparkContext
ssc.checkpoint("/home/data/checkpoint")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = format.format(Calendar.getInstance().getTime())
val group_id = "Request DStream EMR " + date
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "ip.internal:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" -> group_id,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("ssp.requests")
val stream = KafkaUtils.createDirectStream[String, Array[Byte]](
ssc,
PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams)
)
val events = stream
val sqlContext = SQLContextSingleton.getInstance(sc)
import sqlContext.implicits._
events.foreachRDD(rdd=>{
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//val rdd2 = rdd.repartition(2)
val current = rdd.map(record => (record.key, message_proto(record.value)))
val myDataFrame = current.toDF()
val query = myDataFrame.select(col("_2").as("value_udf"))
.select(col("value_udf")("version").as("version"), col("value_udf")("adunit").as("adunit"), col("value_udf")("adunit_original").as("adunit_original"),
col("value_udf")("brand").as("brand"), col("value_udf")("country").as("country"), col("value_udf")("device_connection_type").as("device_connection_type"),
col("value_udf")("device_density").as("device_density"), col("value_udf")("device_height").as("device_height"),
col("value_udf")("device_id").as("device_id"), col("value_udf")("device_type").as("device_type"), col("value_udf")("device_width").as("device_width"),
col("value_udf")("domain").as("domain"), col("value_udf")("endpoint").as("endpoint"), col("value_udf")("endpoint_type").as("endpoint_type"),
col("value_udf")("endpoint_version").as("endpoint_version"), col("value_udf")("external_dfp_id").as("external_dfp_id"),
col("value_udf")("id_req").as("id_req"), col("value_udf")("ip").as("ip"), col("value_udf")("lang").as("lang"), col("value_udf")("lat").as("lat"),
col("value_udf")("lon").as("lon"), col("value_udf")("model").as("model"), col("value_udf")("ncc").as("ncc"), col("value_udf")("noc").as("noc"),
col("value_udf")("non").as("non"), col("value_udf")("os").as("os"), col("value_udf")("osv").as("osv"), col("value_udf")("scc").as("scc"),
col("value_udf")("sim_operator_code").as("sim_operator_code"), col("value_udf")("size").as("size"), col("value_udf")("soc").as("soc"),
col("value_udf")("son").as("son"), col("value_udf")("source").as("source"), col("value_udf")("ts").as("ts").cast("timestamp"), col("value_udf")("user_agent").as("user_agent"),
col("value_udf")("status").as("status"), col("value_udf")("delivery_network").as("delivery_network"), col("value_udf")("delivery_time").as("delivery_time"),
col("value_udf")("delivery_status").as("delivery_status"), col("value_udf")("delivery_network_key").as("delivery_network_key"),
col("value_udf")("delivery_price").as("delivery_price"), col("value_udf")("device_id_original").as("device_id_original"),
col("value_udf")("tracking_limited").as("tracking_limited"), col("value_udf")("from_cache").as("from_cache"),
date_format(col("value_udf")("ts"), "yyyy").as("date").as("year"),
date_format(col("value_udf")("ts"), "MM").as("date").as("month"),
date_format(col("value_udf")("ts"), "dd").as("date").as("day"),
date_format(col("value_udf")("ts"), "HH").as("date").as("hour"))
try {
query.write.partitionBy("year", "month", "day", "hour")
.mode(SaveMode.Append).parquet("s3a://tap-datalake/ssp.requests")
events.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}catch{
case e:Exception=>
print(e)
}
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}