Write dataframe to csv with datatype map i

2019-08-24 14:02发布

问题:

I have a file which is file1snappy.parquet. It is having a complex data structure like a map, array inside that.After processing that I got final result.while writing that results to csv I am getting some error saying

"Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support map<string,bigint> data type."

Code which I have used:

val conf=new SparkConf().setAppName("student-example").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")
    def sumaggr=udf((aggr: Map[String, collection.mutable.WrappedArray[Long]]) => if (aggr.keySet.contains("aggr")) aggr("aggr").sum else 0)
datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
    datadf.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")

I tried converting datadf.toString() but still I am facing same issue. How can write that result to CSV.

回答1:

Spark CSV source supports only atomic types. You cannot store any columns that are non-atomic

I think best is to create a JSON for the column that has map<string,bigint> as a datatype and save it in csv as below.

import spark.implicits._ 
import org.apache.spark.sql.functions._

datadf.withColumn("column_name_with_map_type", to_json(struct($"column_name_with_map_type"))).write.csv("outputpath")

Hope this helps!



回答2:

You are trying to save the output of

val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")

which I guess is a mistake as the udf function and all the aggregation done would go in vain if you do so

So I think you want to save the output of

datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)

So you need to save it in a new dataframe variable and use that variable to save.

val finalDF = datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0)
finalDF.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")

And you should be fine.