Is Spark DataFrame nested structure limited for se

2019-02-03 07:39发布

问题:

I have a json file with some data, I’m able to create DataFrame out of it and the schema for particular part of it I’m interested in looks like following:

val json: DataFrame = sqlc.load("entities_with_address2.json", "json")

root
 |-- attributes: struct (nullable = true)
 |    |-- Address2: array (nullable = true)
 |    |    |-- value: struct (nullable = true)
 |    |    |    |-- Zip: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- value: struct (nullable = true)
 |    |    |    |    |    |    |-- Zip5: array (nullable = true)
 |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |-- value: string (nullable = true)

When I’m trying to just select the deepest field: json.select("attributes.Address2.value.Zip.value.Zip5").collect()

It gives me an exception: org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);

By looking at the resolveGetField method of LogicalPlan I see that it's possible to select from StructType or from ArrayType(StructType), but is there any way to select deeper? How can I select field I need?

Here is the full exception.

    org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40)
        at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
        at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
        at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476)
        at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491)
        at com.reltio.analytics.PREDF.test(PREDF.scala:55)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)

回答1:

The problem is the ArrayType -- you can recreate this error very simply:

val df = Seq(Tuple1(Array[String]())).toDF("users")

At which point df.printSchema shows:

root
 |-- users: array (nullable = true)
 |    |-- element: string (containsNull = true)

And now if you try:

df.select($"users.element")

You get the exact same exception -- GetField is not valid...

You have a couple of different options to unwind the Array. You can get at individual items with getItem like this:

df.select($"users".getItem(0))

And since getItem returns another Column, you can dig as deep as you want:

df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...)
// etc

But with an array, you probably want to programmatically unwind the whole Array. If you look at the way Hive handles this, you need to do a LATERAL VIEW. In Spark, you are going to have to use explode to create the equivalent of a Hive LATERAL VIEW:

case class User(name: String)
df.explode($"users"){ case Row(arr: Array[String]) =>  arr.map(User(_)) }

Note that I use a Case Class in my map -- this is what the docs have. If you don't want to create a case class you can just return a Tuple1 (or Tuple2 or Tuple3 etc):

df.explode($"users"){ case Row(arr: Array[String]) =>  arr.map(Tuple1(_)) }