The following code raises "Detected cartesian product for INNER join" exception:
first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])
second_df = second_df.withColumn("second_id", F.lit("1"))
# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()
result_df = first_df.join(second_df,
first_df.first_id == second_df.second_id,
'inner')
data = result_df.collect()
result_df.explain()
and shows me that the logical plan is as shown below:
Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).
But, if I use "persist" method before JOIN it works and the Physical Plan is:
*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(first_id#0, 10)
: +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(second_id#4, 10)
+- InMemoryTableScan [some_value#2, second_id#4]
+- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
+- *(1) Project [some_value#2, 1 AS second_id#4]
+- Scan ExistingRDD[some_value#2]
So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.