Spark - How to apply a udf over single field in a

2019-08-02 10:22发布

问题:

I have a Dataframe with two columns of types String and Seq[Map[String, String]]. Something like:

Name    Contact
Alan    [(Map(number -> 12345   , type -> home)),   (Map(number -> 87878787 , type -> mobile))]
Ben     [(Map(number -> 94837593    , type -> job)),(Map(number -> 346      , type -> home))]

So what I need is to apply a udf over the field number in each Map[String,String] o each element in the array. This udf will basically convert into 0000 any number which length is less than 6. Something like this:

def valid_num_udf = 
udf((numb:String) =>
{ 
if(numb.length < 6)
   "0000"
else 
    numb 
})

The expected result is something like:

NAME    CONTACT
Alan    [(Map(number -> 0000    , type -> home)),   (Map(number -> 87878787 , type -> mobile))]
Ben     [(Map(number -> 94837593    , type -> job)),(Map(number -> 0000     , type -> home))]

What I would like is to use another udf to access each number field to then apply the valid_num_udf()

I was trying something like this, but I don't know what is the correct syntax to do this in Scala.

val newDf = Df.withColumn("VALID_CONTACT", myUdf($"CONTACT"))

//This part is really really wrong, but don't know better
def myUdf = udf[Seq[Map[String, String]], Seq[Map[String, String]]] { 
    inputSeq => inputSeq.map(_.get("number") => valid_num_udf(_.get("number")))
}

Can anyone tell me how to access just that one single field in the map, leaving the other fields of the map untouched?

Update: The Schema of the DataFrame would be

root
 |-- NAME: string (nullable = true)
 |-- CONTACT: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

or

org.apache.spark.sql.types.StructType = StructType(StructField(NAME,StringType,true), StructField(CONTACT,ArrayType(MapType(StringType,StringType,true),true),true))

回答1:

A udf function would require a column to be passed as arguments which go through serialization and deserialization to be converted to primitive data types. So when the column values reach udf function they are already primitive data types. So you can't call another udf function from a udf function unless you convert the primitive types to column types.

What you can do instead of defining and calling another udf function is to just define a simple function and call that function from the udf function

import org.apache.spark.sql.functions._
def valid_num_udf(number: String) = number.length < 6 match{
  case true => "0000"
  case false => number
}
def myUdf = udf((inputSeq: Seq[Map[String, String]]) => {
  inputSeq.map(x => Map("number" -> valid_num_udf(x("number")), "type"-> x("type")))
})

and then just call the udf function from withColumn api

val newDf = Df.withColumn("VALID_CONTACT", myUdf($"Contact"))


回答2:

The signature of your UDF is slightly incorrect. You'll pass Seq[Map[String, String]] as the input :

val validNumber = udf{ (xs: Seq[Map[String, String]]) => 
                        xs.map{ x => 
                          if (x("number").length < 6) 
                             Map("number" -> "0000" , "type" -> x("type")) 
                          else x }
                     }

 df.show(false)
+----+-----------------------------------------------------------------------------+
|name|contact                                                                      |
+----+-----------------------------------------------------------------------------+
|Alan|[Map(number -> 6789, type -> home), Map(number -> 987654321, type -> mobile)]|
+----+-----------------------------------------------------------------------------+


df.select(validNumber($"contact") ).show(false)
+-----------------------------------------------------------------------------+
|UDF(contact)                                                                 |
+-----------------------------------------------------------------------------+
|[Map(number -> 0000, type -> home), Map(number -> 987654321, type -> mobile)]|
+-----------------------------------------------------------------------------+


回答3:

Instead of creating two separate UDFs, you can use a single one which takes the whole Seq[Map[String, String]] as input and transforms it. This should be both faster and better than having it as two separate UDF.

val valid_num_udf = udf((seq: Seq[Map[String, String]]) => {
  seq.map{ m => 
    m.get("number") match {
      case Some(number) if number.length < 6 => m + ("number" -> "0000")
      case _ => m 
    }
  } 
})

Using the provided dataframe:

df.withColumn("Contact", valid_num_udf($"Contact"))

will give

+----+----------------------------------------------------------------------------+
|Name|Contact                                                                     |
+----+----------------------------------------------------------------------------+
|Alan|[Map(number -> 0000, type -> home), Map(number -> 87878787, type -> mobile)]|
|Ben |[Map(number -> 94837593, type -> job), Map(number -> 0000, type -> home)]   |
+----+----------------------------------------------------------------------------+

To have the logic separated from the rest, you don't need to call a separate UDF, simply add the logic to a method and call that. For example,

def valid_num(number: String) = 
  if (number.length < 6)
    "0000"
  else
    number

val myUdf = udf((seq: Seq[Map[String, String]]) => {
  seq.map{ m => 
    m.get("number") match {
      case Some(number) => m + ("number" -> valid_num(number))
      case _ => m 
    }
  } 
})