I am using Spark 2.4.3 with the extension of GeoSpark 1.2.0.
I have two tables to join as range distance. One table (t1
) if ~ 100K rows with one column only that is a Geospark's geometry. The other table (t2
) is ~ 30M rows and it is composed by an Int
value and a Geospark's geometry column.
What I am trying to do is just a simple:
val spark = SparkSession
.builder()
// .master("local[*]")
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
.config("geospark.global.index", "true")
.config("geospark.global.indextype", "rtree")
.config("geospark.join.gridtype", "rtree")
.config("geospark.join.numpartition", 200)
.config("spark.sql.parquet.filterPushdown", "true")
// .config("spark.sql.shuffle.partitions", 10000)
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.appName("PropertyMaster.foodDistanceEatout")
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(spark)
spark.sparkContext.setLogLevel("ERROR")
spark.read
.load(s"$dataPath/t2")
.repartition(200)
.createOrReplaceTempView("t2")
spark.read
.load(s"$dataPath/t1")
.repartition(200)
.cache()
.createOrReplaceTempView("t1")
val query =
"""
|select /*+ BROADCAST(t1) */
| t2.cid, ST_Distance(t1.geom, t2.geom) as distance
| from t2, t1 where ST_Distance(t1.geom, t2.geom) <= 3218.69""".stripMargin
spark.sql(query)
.repartition(200)
.write.mode(SaveMode.Append)
.option("path", s"$dataPath/my_output.csv")
.format("csv").save()
I tried different configurations, cboth when I run it locally or on my local cluster on my laptop (tot mem 16GB and 8 cores) but without any luck as the program crashes at "Distinct at Join" for GeoSpark with lots of shuffling. However I am not able to remove the shuffling from SparkSQL syntax. I thought to add an extra column id on the biggest table as for example same integer every 200 rows or so and repartition by that, but didn't work too.
I was expecting a partitioner for GeoSpark indexing but I am not sure it is working.
Any idea?
I have found an answer myself, as the problem of the GC overhead was due to partitioning but also the memory needed for the Partitioner by GeoSpark (based on index) and the timeout due to long geoquery calculations that have been solved adding the following parameters as suggested by GeoSpark website itself: