I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")
I have tried:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Error which I got:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
What is the right command to load CSV file as DataFrame in Apache Spark?
In Java 1.8 This code snippet perfectly working to read CSV files
POM.xml
Java
It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.
In case you are building a jar with scala 2.11 and Apache 2.0 or higher.
There is no need to create a
sqlContext
orsparkContext
object. Just aSparkSession
object suffices the requirement for all needs.Following is mycode which works fine:
In case you are running in cluster just change
.master("local")
to.master("yarn")
while defining thesparkBuilder
objectThe Spark Doc covers this: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
Penny's Spark 2 example is the way to do it in spark2. There's one more trick: have that header generated for you by doing an initial scan of the data, by setting the option
inferSchema
totrue
Here, then, assumming that
spark
is a spark session you have set up, is the operation to load in the CSV index file of all the Landsat images which amazon host on S3.The bad news is: this triggers a scan through the file; for something large like this 20+MB zipped CSV file, that can take 30s over a long haul connection. Bear that in mind: you are better off manually coding up the schema once you've got it coming in.
(code snippet Apache Software License 2.0 licensed to avoid all ambiguity; something I've done as a demo/integration test of S3 integration)
With Spark 2.0, following is how you can read CSV
parse CSV as DataFrame/DataSet with Spark 2.x
First initialize
SparkSession
object by default it will available in shells asspark
1. Do it in programmatic way
2. You can do this SQL way as well
Dependencies:
Spark version < 2.0
Dependencies: