Spark 2.2 Scala DataFrame select from string array

2019-03-04 02:05发布

问题:

I'm new to SparkSQL/Scala and I'm struggling with a couple seemingly simple tasks.

I'm trying to build some dynamic SQL from a Scala String Array. I'm trying to re-type some columns in my DataFrame, but I won't know exactly which I need to retype until runtime where I can see the set of columns in the DataFrame. So I'm trying to doing this:

val cols = df.columns
val typedCols = cols.map( c => getTypedColumn(c) )
df.select( ...)  or df.selectExpr(...) // how to invoke this with vals from my string array??

typedCols will end up being an array of strings with values like this:

["a", "cast(b as int) b", "c"]

Do I need to create a big comma delimited string first from that array?

So, assuming this will work, I'd invoke that select statement, and it would transform my DataFrame to a new DataFrame with my desired types. But some of those records in the DataFrame will have errors, and will fail the attempted re-typing.

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket? Would I need to do a validation pass first before attempting the DataFrame select?

回答1:

You can just use variadic arguments:

val df = Seq(("a", "1", "c"), ("foo", "bar", "baz")).toDF("a", "b", "c")
val typedCols = Array("a", "cast(b as int) b", "c")
df.selectExpr(typedCols: _*).show

+---+----+---+
|  a|   b|  c|
+---+----+---+
|  a|   1|  c|
|foo|null|baz|
+---+----+---+

but personally I prefer columns:

val typedCols = Array($"a", $"b" cast "int", $"c")
df.select(typedCols: _*).show

How would I get a DataFrame result with all the good records that passed the typing and then throw all the bad records in some kind of error bucket?

Data that failed to cast is NULL. To find good records use na.drop:

val result = df.selectExpr(typedCols: _*)
val good = result.na.drop()

To find bad check if any is NULL

import org.apache.spark.sql.functions.col

val bad = result.where(result.columns.map(col(_).isNull).reduce(_ || _))

To get unmatched data:

  • If typedCols are Seq[Column] you can

    df.where(typedCols.map(_.isNull).reduce(_ || _))  
    
  • If typedCols are Seq[String] you can:

    import org.apache.spark.sql.functions.expr
    
    df.where(typedCols.map(expr(_).isNull).reduce(_ || _))