How to refer broadcast variable in Spark DataFrame

2020-08-03 04:06发布

问题:

I have the following SparkSQL:

val resultDf = spark.sql("SELECT name, phone, country FROM users")

I'd like to filter returned records by countries which are present in the following collection:

val countries = Seq("Italy", "France", "United States", "Poland", "Spain")

For example I can create the broadcast variable based on the collection:

val countriesBroadcast = sc.broadcast(countries)

but is it possible(and if so - how?) to use the countriesBroadcast variable inside of my SQL query?

回答1:

In spark data frame API we can broadcast the entire table can be joined with the target table to get the desired output. Here is the example code.

Imports

import org.apache.spark.sql.functions.broadcast

Code

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.read.option("header", true).csv("data/user.txt")
df.createOrReplaceTempView("users")
val countries = Seq("Italy", "France", "United States", "Poland", "Spain")
import spark.implicits._
spark.sparkContext.parallelize(countries, 1).toDF("country").createOrReplaceTempView("countries")
broadcast(spark.table("countries")).join(spark.table("users"), "country").show()

"data/user.txt" file contents

 name,phone,country
 a,123,India
 b,234,Italy
 c,526,France
 d,765,India

Code output:

+-------+----+-----+
|country|name|phone|
+-------+----+-----+
|  Italy|   b|  234|
| France|   c|  526|
+-------+----+-----+

Note: code tested with Spark 2.2 and Scala 2.11



回答2:

It is not possible, with exception to UserDefinedFunctions, UserDefinedAggregateFunctions and Aggregators (i.e. non-declarative code).

To use broadcasting with DataFrame / SQL API you should use DataFrames and use broadcast hint - Spark SQL broadcast hash join.