I am using Spark 2.4.3 with the extension of GeoSpark 1.2.0.
I have two tables to join as range distance. One table (t1
) if ~ 100K rows with one column only that is a Geospark's geometry. The other table (t2
) is ~ 30M rows and it is composed by an Int
value and a Geospark's geometry column.
What I am trying to do is just a simple:
val spark = SparkSession
// .master("local[*]")
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
.config("geospark.global.index", "true")
.config("geospark.global.indextype", "rtree")
.config("geospark.join.gridtype", "rtree")
.config("geospark.join.numpartition", 200)
.config("spark.sql.parquet.filterPushdown", "true")
// .config("spark.sql.shuffle.partitions", 10000)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
val query =
|select /*+ BROADCAST(t1) */
| t2.cid, ST_Distance(t1.geom, t2.geom) as distance
| from t2, t1 where ST_Distance(t1.geom, t2.geom) <= 3218.69""".stripMargin
.option("path", s"$dataPath/my_output.csv")
I tried different configurations, cboth when I run it locally or on my local cluster on my laptop (tot mem 16GB and 8 cores) but without any luck as the program crashes at "Distinct at Join" for GeoSpark with lots of shuffling. However I am not able to remove the shuffling from SparkSQL syntax. I thought to add an extra column id on the biggest table as for example same integer every 200 rows or so and repartition by that, but didn't work too.
I was expecting a partitioner for GeoSpark indexing but I am not sure it is working.
Any idea?