Detected cartesian product for INNER join on liter

2019-02-18 11:51发布

问题:

The following code raises "Detected cartesian product for INNER join" exception:

first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
                          first_df.first_id == second_df.second_id,
                          'inner')
data = result_df.collect()

result_df.explain()

and shows me that the logical plan is as shown below:

Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).

But, if I use "persist" method before JOIN it works and the Physical Plan is:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(first_id#0, 10)
:     +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
            +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
                  +- *(1) Project [some_value#2, 1 AS second_id#4]
                     +- Scan ExistingRDD[some_value#2]

So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.

回答1:

The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.

It would be trivial to achieve the same outcome, without persistence, using udf

from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

@pandas_udf('integer', PandasUDFType.SCALAR)                   
def identity(x):                                                   
    return x    

second_df = second_df.withColumn('second_id', identity(lit(1)))

result_df = first_df.join(second_df,
                         first_df.first_id == second_df.second_id,
                         'inner')

result_df.explain()

== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
:     +- *(1) Filter isnotnull(first_id#4)
:        +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#129, 200)
      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.

Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.

A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.