Lookup in Spark dataframes

2020-02-13 06:00发布

问题:

I am using Spark 1.6 and I would like to know how to implement in lookup in the dataframes.

I have two dataframes employee & department.

  • Employee Dataframe

    -------------------
    Emp Id | Emp Name
    ------------------
    1 | john
    2 | David
    
  • Department Dataframe

    --------------------
    Dept Id | Dept Name | Emp Id
    -----------------------------
    1 | Admin | 1
    2 | HR | 2
    

I would like to lookup emp id from the employee table to the department table and get the dept name. So, the resultset would be

Emp Id | Dept Name
-------------------
1 | Admin
2 | HR

How do I implement this look up UDF feature in SPARK. I don't want to use JOIN on both the dataframes.

回答1:

As already mentioned in the comments, joining the dataframes is the way to go.

You can use a lookup, but I think there is no "distributed" solution, i.e. you have to collect the lookup-table into driver memory. Also note that this approach assumes that EmpID is unique:

import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map

val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))

val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")


val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))

val combinedDF = depsDF
  .withColumn("empNames",lookup(lookupMap)($"EmpID"))

My initial thought was to pass the empRdd to the UDF and use the lookup method defined on PairRDD, but this does of course not work because you cannot have spark actions (i.e. lookup) within transformations (ie. the UDF).

EDIT:

If your empDf has multiple columns (e.g. Name,Age), you can use this

val empRdd = empDf.rdd.map{row =>
      (row.getInt(0),(row.getString(1),row.getInt(2)))}


    val lookupMap = empRdd.collectAsMap()
    def lookup(lookupMap:Map[Int,(String,Int)]) =
         udf((empID:Int) => lookupMap.lift(empID))

    depsDF
      .withColumn("lookup",lookup(lookupMap)($"EmpID"))
      .withColumn("empName",$"lookup._1")
      .withColumn("empAge",$"lookup._2")
      .drop($"lookup")
      .show()


回答2:

As you are saying you already have Dataframes then its pretty easy follow these steps:

1)create a sqlcontext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

2) Create Temporary tables for all 3 Eg:

EmployeeDataframe.createOrReplaceTempView("EmpTable")

3) Query using MySQL Queries

val MatchingDetails = sqlContext.sql("SELECT DISTINCT E.EmpID, DeptName FROM EmpTable E inner join DeptTable G on " +
  "E.EmpID=g.EmpID")


回答3:

Starting with some "lookup" data, there are two approaches:

Method #1 -- using a lookup DataFrame

// use a DataFrame (via a join)
val lookupDF = sc.parallelize(Seq(
  ("banana",   "yellow"),
  ("apple",    "red"),
  ("grape",    "purple"),
  ("blueberry","blue")
)).toDF("SomeKeys","SomeValues")

Method #2 -- using a map in a UDF

// turn the above DataFrame into a map which a UDF uses
val Keys = lookupDF.select("SomeKeys").collect().map(_(0).toString).toList
val Values = lookupDF.select("SomeValues").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap

def ThingToColor(key: String): String = {
  if (key == null) return ""
  val firstword = key.split(" ")(0) // fragile!
  val result: String = KeyValueMap.getOrElse(firstword,"not found!")
  return (result)
}

val ThingToColorUDF = udf( ThingToColor(_: String): String )

Take a sample data frame of things that will be looked up:

val thingsDF = sc.parallelize(Seq(
  ("blueberry muffin"),
  ("grape nuts"),
  ("apple pie"),
  ("rutabaga pudding")
)).toDF("SomeThings")

Method #1 is to join on the lookup DataFrame

Here, the rlike is doing the matching. And null appears where that does not work. Both columns of the lookup DataFrame get added.

val result_1_DF = thingsDF.join(lookupDF, expr("SomeThings rlike SomeKeys"), 
                     "left_outer")

Method #2 is to add a column using the UDF

Here, only 1 column is added. And the UDF can return a non-Null value. However, if the lookup data is very large it may fail to "serialize" as required to send to the workers in the cluster.

val result_2_DF = thingsDF.withColumn("AddValues",ThingToColorUDF($"SomeThings"))

Which gives you:

In my case I had some lookup data that was over 1 million values, so Method #1 was my only choice.