Process all columns / the entire row in a Spark UD

2020-01-29 17:25发布

问题:

For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features column that is a minhash of all of them.

While this could be done by performing a dataframe.toRDD it is expensive to do that when the next step will be to simply convert the RDD back to a dataframe.

So is there a way to do a udf along the following lines:

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

Row is not a spark sql datatype of course - so this would not work as shown.

Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn. What is not so clear is what can be used inside a spark sql statement:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")

回答1:

Row is not a spark sql datatype of course - so this would not work as shown.

I am going to show that you can use Row to pass all the columns or selected columns to a udf function using struct inbuilt function

First I define a dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+

Then I define a function to make all the elements in a row as one string separated by , (as you have computeHash function)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")

Then I use it in udf function

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))

Finally I call the udf function using withColumn function and struct inbuilt function combining selected columns as one column and pass to the udf function

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+

So you can see that Row can be used to pass whole row as an argument

You can even pass all columns in a row at once

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

Updated

You can achieve the same with sql queries too, you just need to register the udf function as

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")

It will give you the same result as above

Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

I hope the answer is helpful



回答2:

I came up with a workaround: drop the column names into any existing spark sql function to generate a new output column:

concat(${df.columns.tail.mkString(",'-',")}) as Features

In this case the first column in the dataframe is a target and was excluded. That is another advantage of this approach: the actual list of columns many be manipulated.

This approach avoids unnecessary restructuring of the RDD/dataframes.