可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
How to give more column conditions when joining two dataframes. For example I want to run the following :
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
I want to join only when these columns match. But above syntax is not valid as cols only takes one string. So how do I get what I want.
回答1:
There is a Spark column/expression API join for such case:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
The <=>
operator in the example means "Equality test that is safe for null values".
The main difference with simple Equality test (===
) is that the first one is safe to use in case one of the columns may have null values.
回答2:
As of Spark version 1.5.0 (which is currently unreleased), you can join on multiple DataFrame columns. Refer to SPARK-7990: Add methods to facilitate equi-join on multiple join keys.
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
The question asked for a Scala answer, but I don't use Scala. Here is my best guess....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
回答3:
One thing you can do is to use raw SQL:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
回答4:
In Pyspark you can simply specify each condition separately:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
Just be sure to use operators and parenthesis correctly.
回答5:
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
To make it case insensitive,
import org.apache.spark.sql.functions.{lower, upper}
then just use lower(value)
in the condition of the join method.
Eg: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
回答6:
The ===
options give me duplicated columns. So I use Seq
instead.
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
Of course, this only works when the names of the joining columns are the same.
回答7:
Spark SQL supports join on tuple of columns when in parentheses, like
... WHERE (list_of_columns1) = (list_of_columns2)
which is a way shorter than specifying equal expressions (=) for each pair of columns combined by a set of "AND"s.
For example:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
instead of
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
which is less readable too especially when list of columns is big and you want to deal with NULLs easily.