I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.
I have two dataframes employee & department.
I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be
Emp Id | Dept Name
-------------------
1 | Admin
2 | HR
How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.
As already mentioned in the comments, joining the dataframes is the way to go.
You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
My initial thought was to pass the empRdd
to the UDF and use the lookup
method defined on PairRDD
, but this does of course not work because you cannot have spark actions (i.e. lookup
) within transformations (ie. the UDF).
EDIT:
If your empDf has multiple columns (e.g. Name,Age), you can use this
val empRdd = empDf.rdd.map{row =>
(row.getInt(0),(row.getString(1),row.getInt(2)))}
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,(String,Int)]) =
udf((empID:Int) => lookupMap.lift(empID))
depsDF
.withColumn("lookup",lookup(lookupMap)($"EmpID"))
.withColumn("empName",$"lookup._1")
.withColumn("empAge",$"lookup._2")
.drop($"lookup")
.show()
As you are saying you already have Dataframes then its pretty easy follow these steps:
1)create a sqlcontext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2) Create Temporary tables for all 3 Eg:
EmployeeDataframe.createOrReplaceTempView("EmpTable")
3) Query using MySQL Queries
val MatchingDetails = sqlContext.sql("SELECT DISTINCT E.EmpID, DeptName FROM EmpTable E inner join DeptTable G on " +
"E.EmpID=g.EmpID")
Starting with some "lookup" data, there are two approaches:
Method #1 -- using a lookup DataFrame
// use a DataFrame (via a join)
val lookupDF = sc.parallelize(Seq(
("banana", "yellow"),
("apple", "red"),
("grape", "purple"),
("blueberry","blue")
)).toDF("SomeKeys","SomeValues")
Method #2 -- using a map in a UDF
// turn the above DataFrame into a map which a UDF uses
val Keys = lookupDF.select("SomeKeys").collect().map(_(0).toString).toList
val Values = lookupDF.select("SomeValues").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap
def ThingToColor(key: String): String = {
if (key == null) return ""
val firstword = key.split(" ")(0) // fragile!
val result: String = KeyValueMap.getOrElse(firstword,"not found!")
return (result)
}
val ThingToColorUDF = udf( ThingToColor(_: String): String )
Take a sample data frame of things that will be looked up:
val thingsDF = sc.parallelize(Seq(
("blueberry muffin"),
("grape nuts"),
("apple pie"),
("rutabaga pudding")
)).toDF("SomeThings")
Method #1 is to join on the lookup DataFrame
Here, the rlike is doing the matching. And null appears where that does not work. Both columns of the lookup DataFrame get added.
val result_1_DF = thingsDF.join(lookupDF, expr("SomeThings rlike SomeKeys"),
"left_outer")
Method #2 is to add a column using the UDF
Here, only 1 column is added. And the UDF can return a non-Null value. However, if the lookup data is very large it may fail to "serialize" as required to send to the workers in the cluster.
val result_2_DF = thingsDF.withColumn("AddValues",ThingToColorUDF($"SomeThings"))
Which gives you:
In my case I had some lookup data that was over 1 million values, so Method #1 was my only choice.