I have the following SparkSQL:
val resultDf = spark.sql("SELECT name, phone, country FROM users")
I'd like to filter returned records by countries which are present in the following collection:
val countries = Seq("Italy", "France", "United States", "Poland", "Spain")
For example I can create the broadcast variable based on the collection:
val countriesBroadcast = sc.broadcast(countries)
but is it possible(and if so - how?) to use the countriesBroadcast
variable inside of my SQL query?
In spark data frame API we can broadcast the entire table can be joined with the target table to get the desired output. Here is the example code.
Imports
import org.apache.spark.sql.functions.broadcast
Code
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read.option("header", true).csv("data/user.txt")
df.createOrReplaceTempView("users")
val countries = Seq("Italy", "France", "United States", "Poland", "Spain")
import spark.implicits._
spark.sparkContext.parallelize(countries, 1).toDF("country").createOrReplaceTempView("countries")
broadcast(spark.table("countries")).join(spark.table("users"), "country").show()
"data/user.txt" file contents
name,phone,country
a,123,India
b,234,Italy
c,526,France
d,765,India
Code output:
+-------+----+-----+
|country|name|phone|
+-------+----+-----+
| Italy| b| 234|
| France| c| 526|
+-------+----+-----+
Note: code tested with Spark 2.2 and Scala 2.11
It is not possible, with exception to UserDefinedFunctions
, UserDefinedAggregateFunctions
and Aggregators
(i.e. non-declarative code).
To use broadcasting with DataFrame
/ SQL API you should use DataFrames
and use broadcast hint - Spark SQL broadcast hash join.