How to return a case class when using Spark High O

2020-04-17 04:19发布

问题:

I am trying to use Spark transform function in order to transform the items of an array from type ClassA into ClassB as shown below:

case class ClassA(a: String, b: String, c: String)
case class ClassB(a: String, b: String)

val a1 = ClassA("a1", "b1", "c1")
val a2 = ClassA("a2", "b2", "c2")

val df = Seq(
(Seq(a1, a2))
).toDF("ClassA")

df.withColumn("ClassB", expr("transform(ClassA, c -> ClassB(c.a, c.b))")).show(false)

Although the above code fails with the message:

org.apache.spark.sql.AnalysisException: Undefined function: 'ClassB'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.

The only way to make this work was through struct as shown next:

df.withColumn("ClassB", expr("transform(ClassA, c -> struct(c.a as string, c.b as string))")).show(false)

// +----------------------------+--------------------+
// |ClassA                      |ClassB              |
// +----------------------------+--------------------+
// |[[a1, b1, c1], [a2, b2, c2]]|[[a1, b1], [a2, b2]]|
// +----------------------------+--------------------+

So the question is if there is any way to return a case class instead of a struct when using transform?

回答1:

The transform expression is relational and doesn't know anything about case classes ClassA and ClassB. The only way you have AFAIK would be to register an UDF so you can use your structure (or inject functions) but you would also have to deal with a "Row" encoded value instead of ClassA (SparkSQL is all about encoding :) ) like so :

sparkSession.udf.register("toB", (a: Row) => ClassB(a.getAs[String]("a"), a.getAs[String]("b")))

df.withColumn("ClassB", expr("transform(ClassA, c -> toB(c))")).show(false)

Side note: Naming your column "ClassA" might be confusing since transform is reading the column, not the type.