DataFrame user-defined function not applied unless

2019-07-28 22:42发布

I want to convert my DataFrame column using implicits functions definition.

I have my DataFrame type defined, which contains additional functions:

class MyDF(df: DataFrame) {
    def bytes2String(colName: String): DataFrame = df
       .withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
       .drop(colname)
       .withColumnRenamed(colname + "_tmp", colname)
}

Then I define my implicit conversion class:

object NpDataFrameImplicits {
    implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}

So finally, here is what I do in a small FunSuite unit test:

test("example: call to bytes2String") {
    val df: DataFrame = ...
    df.select("header.ID").show() // (1)
    df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
    df.bytes2String("header.ID").select("header.ID").show() // (3)
}

Show #1

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

Show #2

+------------------------------------+
|id                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

Show #3

+-------------------------------------------------+
|ID                                               |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+

As you can witness here, the third show (aka without the column renaming) does not work as expected and shows us a non-converted ID column. Anyone knows why?

EDIT:

Output of df.select(col("header.ID") as "ID").bytes2String("ID").show():

+------------------------------------+
|ID                                  |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+

1条回答
祖国的老花朵
2楼-- · 2019-07-28 23:16

Let me explain, what is happening on your conversion function with bellow example. First Create data frame:

val jsonString: String =
    """{
      | "employee": {
      |   "id": 12345,
      |   "name": "krishnan"
      | },
      | "_id": 1
      |}""".stripMargin

  val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))

  val df: DataFrame = sparkSession.read.json(jsonRDD)
  df.printSchema()

Output structure:

root
 |-- _id: long (nullable = true)
 |-- employee: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)

Conversion function similar to your's:

def myConversion(myDf: DataFrame, colName: String): DataFrame = {
    myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
      .drop(colName)
      .withColumnRenamed(colName + "_tmp", colName)
  }

Scenario 1# Do the conversion for root level field.

myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()

Result:

+----------------+---+
|        employee|_id|
+----------------+---+
|[12345,krishnan]|  2|
|[12345,krishnan]|  2|
+----------------+---+
+---+
|_id|
+---+
|  2|
|  2|
+---+

Scenario 2# do the conversion for employee.id. Here, when we use employee.id means, data frame got added with new field id at root level. This is the correct behavior.

myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()

Result:

+---+----------------+-----------+
|_id|        employee|employee.id|
+---+----------------+-----------+
|  1|[12345,krishnan]|      12346|
|  1|[12345,krishnan]|      12346|
+---+----------------+-----------+
+-----+
|   id|
+-----+
|12345|
|12345|
+-----+

Scenario 3# Select the inner field to root level and then perform conversion.

myConversion(df.select("employee.id"), "id").show()

Result:

+-----+
|   id|
+-----+
|12346|
|12346|
+-----+

My new conversion function, takes struct type field and perform conversion and store it into struct type field itself. Here, pass employee field and convert the id field alone, but changes are done field employee at root level.

case class Employee(id: String, name: String)

def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
    myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
      .drop(colName)
      .withColumnRenamed(colName + "_tmp", colName)
  }

Your scenario number 3# using my conversion function.

myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()

Result#

+---+----------------+
|_id|        employee|    
+---+----------------+
|  1|[12346,krishnan]|
|  1|[12346,krishnan]|
+---+----------------+
+-----+
|   id|
+-----+
|12346|
|12346|
+-----+
查看更多
登录 后发表回答