I have a Dataframe with two columns of types String and Seq[Map[String, String]]. Something like:
Name Contact
Alan [(Map(number -> 12345 , type -> home)), (Map(number -> 87878787 , type -> mobile))]
Ben [(Map(number -> 94837593 , type -> job)),(Map(number -> 346 , type -> home))]
So what I need is to apply a udf
over the field number
in each Map[String,String] o each element in the array. This udf
will basically convert into 0000 any number
which length is less than 6. Something like this:
def valid_num_udf =
udf((numb:String) =>
{
if(numb.length < 6)
"0000"
else
numb
})
The expected result is something like:
NAME CONTACT
Alan [(Map(number -> 0000 , type -> home)), (Map(number -> 87878787 , type -> mobile))]
Ben [(Map(number -> 94837593 , type -> job)),(Map(number -> 0000 , type -> home))]
What I would like is to use another udf to access each number
field to then apply the valid_num_udf()
I was trying something like this, but I don't know what is the correct syntax to do this in Scala.
val newDf = Df.withColumn("VALID_CONTACT", myUdf($"CONTACT"))
//This part is really really wrong, but don't know better
def myUdf = udf[Seq[Map[String, String]], Seq[Map[String, String]]] {
inputSeq => inputSeq.map(_.get("number") => valid_num_udf(_.get("number")))
}
Can anyone tell me how to access just that one single field in the map, leaving the other fields of the map untouched?
Update: The Schema of the DataFrame would be
root
|-- NAME: string (nullable = true)
|-- CONTACT: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
or
org.apache.spark.sql.types.StructType = StructType(StructField(NAME,StringType,true), StructField(CONTACT,ArrayType(MapType(StringType,StringType,true),true),true))
A udf function would require a column to be passed as arguments which go through serialization and deserialization to be converted to primitive data types. So when the column values reach udf function they are already primitive data types. So you can't call another udf function from a udf function unless you convert the primitive types to column types.
What you can do instead of defining and calling another udf function is to just define a simple function and call that function from the udf function
import org.apache.spark.sql.functions._
def valid_num_udf(number: String) = number.length < 6 match{
case true => "0000"
case false => number
}
def myUdf = udf((inputSeq: Seq[Map[String, String]]) => {
inputSeq.map(x => Map("number" -> valid_num_udf(x("number")), "type"-> x("type")))
})
and then just call the udf
function from withColumn
api
val newDf = Df.withColumn("VALID_CONTACT", myUdf($"Contact"))
The signature of your UDF is slightly incorrect. You'll pass Seq[Map[String, String]]
as the input :
val validNumber = udf{ (xs: Seq[Map[String, String]]) =>
xs.map{ x =>
if (x("number").length < 6)
Map("number" -> "0000" , "type" -> x("type"))
else x }
}
df.show(false)
+----+-----------------------------------------------------------------------------+
|name|contact |
+----+-----------------------------------------------------------------------------+
|Alan|[Map(number -> 6789, type -> home), Map(number -> 987654321, type -> mobile)]|
+----+-----------------------------------------------------------------------------+
df.select(validNumber($"contact") ).show(false)
+-----------------------------------------------------------------------------+
|UDF(contact) |
+-----------------------------------------------------------------------------+
|[Map(number -> 0000, type -> home), Map(number -> 987654321, type -> mobile)]|
+-----------------------------------------------------------------------------+
Instead of creating two separate UDF
s, you can use a single one which takes the whole Seq[Map[String, String]]
as input and transforms it. This should be both faster and better than having it as two separate UDF
.
val valid_num_udf = udf((seq: Seq[Map[String, String]]) => {
seq.map{ m =>
m.get("number") match {
case Some(number) if number.length < 6 => m + ("number" -> "0000")
case _ => m
}
}
})
Using the provided dataframe:
df.withColumn("Contact", valid_num_udf($"Contact"))
will give
+----+----------------------------------------------------------------------------+
|Name|Contact |
+----+----------------------------------------------------------------------------+
|Alan|[Map(number -> 0000, type -> home), Map(number -> 87878787, type -> mobile)]|
|Ben |[Map(number -> 94837593, type -> job), Map(number -> 0000, type -> home)] |
+----+----------------------------------------------------------------------------+
To have the logic separated from the rest, you don't need to call a separate UDF
, simply add the logic to a method and call that. For example,
def valid_num(number: String) =
if (number.length < 6)
"0000"
else
number
val myUdf = udf((seq: Seq[Map[String, String]]) => {
seq.map{ m =>
m.get("number") match {
case Some(number) => m + ("number" -> valid_num(number))
case _ => m
}
}
})