Why does Spark fail with “Detected cartesian produ

2020-05-21 09:20发布

问题:

I am using Spark 2.1.0.

When I execute the following code I'm getting an error from Spark. Why? How to fix it?

val i1 = Seq(("a", "string"), ("another", "string"), ("last", "one")).toDF("a", "b")
val i2 = Seq(("one", "string"), ("two", "strings")).toDF("a", "b")
val i1Idx = i1.withColumn("sourceId", lit(1))
val i2Idx = i2.withColumn("sourceId", lit(2))
val input = i1Idx.union(i2Idx)
val weights = Seq((1, 0.6), (2, 0.4)).toDF("sourceId", "weight")
weights.join(input, "sourceId").show

Error:

scala> weights.join(input, "sourceId").show
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [_1#34 AS sourceId#39, _2#35 AS weight#40]
+- Filter (((1 <=> _1#34) || (2 <=> _1#34)) && (_1#34 = 1))
   +- LocalRelation [_1#34, _2#35]
and
Union
:- Project [_1#0 AS a#5, _2#1 AS b#6]
:  +- LocalRelation [_1#0, _2#1]
+- Project [_1#10 AS a#15, _2#11 AS b#16]
   +- LocalRelation [_1#10, _2#11]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1011)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:993)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided

回答1:

You can triggers inner join after turning on the flag

spark.conf.set("spark.sql.crossJoin.enabled", "true")

You also could also use the cross join.

weights.crossJoin(input)

or set the Alias as

weights.join(input, input("sourceId")===weights("sourceId"), "cross")

You can find more about the issue SPARK-6459 which is said to be fixed in 2.1.1

As you have already used 2.1.1 the issue should have been fixed.

Hope this helps!



回答2:

tl;dr Upgrade to Spark 2.1.1. It's an issue in Spark that was fixed.

(I really wished I could also show you the exact change that fixed that in 2.1.1)



回答3:

For me:

Dataset<Row> ds1 = sparkSession.read().load("/tmp/data");
Dataset<Row> ds2 = ds1;
ds1.join(ds2, ds1.col("name").equalTo(ds2.col("name"))) // got "Detected cartesian product for INNER join between logical plans"

Dataset<Row> ds1 = sparkSession.read().load("/tmp/data");
Dataset<Row> ds2 = sparkSession.read().load("/tmp/data");
ds1.join(ds2, ds1.col("name").equalTo(ds2.col("name"))) // running properly without errors

I'm using Spark 2.1.0.



回答4:

Got this error in SPARK version: 2.3.0.cloudera3

Solved by aliasing the dataframes.

e.g. re-assigning the failing dataframe to another dataframe and aliasing the name to that other dataframe.

val dataFrame = inDataFrame.alias("dataFrame")

Hope this helps.