spark-submit fails when case class fields are rese

2019-08-02 15:13发布

问题:

I have backticks used for reserved keyword. One example for the case class is as follows:

case class IPC(
                `type`: String,
                main: Boolean,
                normalized: String,
                section:String,
                `class`: String,
                subClass: String,
                group:String,
                subGroup: String
              )

I have declared the sparksession as follows:

def run(params: SparkApp.Params): Unit ={

    val sparkSession = SparkSession.builder.master("local[*]").appName("SparkUsptoParser").getOrCreate()

//    val conf = new SparkConf().setAppName("SparkUsptoParser").set("spark.driver.allowMultipleContexts", "true")


    val sc = sparkSession.sparkContext
    sc.setLogLevel("INFO")
    sc.hadoopConfiguration.set("fs.s3a.connection.timeout", "500000")

    val (patentParsedRDD, zipPathRDD) = runLocal(sc, params)

    logger.info(f"Starting to parse files, appending parquet ${params.outputPath}")

    import sparkSession.implicits._

    val patentParseDF = patentParsedRDD.toDF().write.mode(SaveMode.Append).parquet(params.outputPath)

    logger.info(f"Done parsing and appending parquet")

    // save list of processed archive
    val logPath = params.outputPath + "/log_%s" format java.time.LocalDate.now.toString
    zipPathRDD.coalesce(1).saveAsTextFile(logPath)
    logger.info(f"Log file save to $logPath")

  }

I am trying to run the jar package with sbt. However, I receive the error, "reserved keyword and cannot be used as field name".

Command used:

./bin/spark-submit /Users/Projects/uspto-parser/target/scala-2.11/uspto-parser-assembly-0.1.jar

Error:

Exception in thread "main" java.lang.UnsupportedOperationException: `class` is a reserved keyword and cannot be used as field name
- array element class: "usptoparser.IPC"
- field (class: "scala.collection.immutable.List", name: "ipcs")
- root class: "usptoparser.PatentDocument"
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:627)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:625)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)

Versions:

sparkVersion := "2.3.0"
sbt.version = 0.13.8
scalaVersion := "2.11.2"

回答1:

You can work it around by using a field name that is not a reserved Java keyword and then renaming it using 'as':

scala> case class IPC(name: String, `class`: String)
defined class IPC

scala> val x = Seq(IPC("a", "b"), IPC("d", "e")).toDF
java.lang.UnsupportedOperationException: `class` is a reserved keyword and cannot be used as field name
- root class: "IPC"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:627)
...

scala> case class IPC(name: String, clazz: String)
defined class IPC

scala> val x = Seq(IPC("a", "b"), IPC("d", "e")).toDF
x: org.apache.spark.sql.DataFrame = [name: string, clazz: string]

scala> x.select($"clazz".as("class")).show(false)
+-----+
|class|
+-----+
|b    |
|e    |
+-----+

scala>