Is there a way to rewrite Spark RDD distinct to us

2019-03-20 14:29发布

问题:

I have an RDD that is too large to consistently perform a distinct statement without spurious errors (e.g. SparkException stage failed 4 times, ExecutorLostFailure, HDFS Filesystem closed, Max number of executor failures reached, Stage cancelled because SparkContext was shut down, etc.)

I am trying to count distinct IDs in a particular column, for example:

print(myRDD.map(a => a._2._1._2).distinct.count())

is there an easy, consistent, less-shuffle-intensive way to do the command above, possibly using mapPartitions, reduceByKey, flatMap, or other commands that use fewer shuffles than distinct?

See also What are the Spark transformations that causes a Shuffle?

回答1:

It might be better to figure out if there is another underlying issue, but the below will do what you want...rather round about way to do it, but it sounds like it will fit your bill:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count

Or even this seems to work, but it isn't associative and commutative. It works due to how the internals of Spark works...but I might be missing a case...so while simpler, I'm not sure I trust it:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count