Following is the extract of the scala code written to extract praquet files and print the schema and first few records from the Parquet file.
But nothing is getting printed.
val batchDuration = 2
val inputDir = "file:///home/samplefiles"
val conf = new SparkConf().setAppName("gpParquetStreaming").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("spark.streaming.fileStream.minRememberDuration", "600000")
val ssc = new StreamingContext(sc, Seconds(batchDuration))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](inputDir, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)
val dataStream = stream.transform(rdd => rdd.map(tuple => tuple._2)).persist
val countData = dataStream.count
dataStream.transform(rdd => rdd.map(record => record.toString)).foreachRDD((rdd: RDD[String], time: Time) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val dataDF = sqlContext.read.json(rdd)
dataDF.printSchema
dataDF.show
})
countData.print
dataStream.print
stream.print
ssc.start()
ssc.awaitTermination()
}
I am trying to read the existing files also. so have set the parameter minRememberDuration to some big number. But still is does not help.
Any help!
Thanks,
EDIT: dependencies used
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.0</version>
</dependency>
EDIT 2: Error:
17/08/25 02:00:45 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
Serialization stack:
- object not serializable (class: org.apache.avro.generic.GenericData$Record, value: {"firstName": "Abhishek"})
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/08/25 02:00:46 ERROR TaskSetManager: Task 0.0 in stage 8.0 (TID 6) had a not serializable result: org.apache.avro.generic.GenericData$Record