How do you perform basic joins of two RDD tables i

2020-01-30 07:37发布

问题:

How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:

  1. Inner Join
  2. Left Outer Join
  3. Cross Join

With two tables (RDD) with a single column in each that has a common key.

RDD(1):(key,U)
RDD(2):(key,V)

I think an inner join is something like this:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.

回答1:

It can be done either using PairRDDFunctions or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option is worth considering.

Assuming your data looks as follows:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

With PairRDDs:

Inner join:

rdd1.join(rdd2)

Left outer join:

rdd1.leftOuterJoin(rdd2)

Cartesian product (doesn't require RDD[(T, U)]):

rdd1.cartesian(rdd2)

Broadcast join (doesn't require RDD[(T, U)]):

  • see Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?

Finally there is cogroup which has no direct SQL equivalent but can be useful in some situations:

cogrouped = rdd1.cogroup(rdd2)

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

With Spark Data Frames

You can use either SQL DSL or execute raw SQL using sqlContext.sql.

df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))

# Register temporary tables to be able to use sqlContext.sql
df1.createTempView('df1')
df2.createTempView('df2')

Inner join:

# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner') 
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

Left outer join:

df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):

df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')

df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')

Since 1.6 (1.5 in Scala) each of these can be combined with broadcast function:

from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), df1.k == df2.k)

to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark