Why does RDD.foreach fail with “SparkException: Th

2019-01-29 00:10发布

问题:

I have a dataset (as an RDD) that I divide into 4 RDDs by using different filter operators.

 val RSet = datasetRdd.
   flatMap(x => RSetForAttr(x, alLevel, hieDict)).
   map(x => (x, 1)).
   reduceByKey((x, y) => x + y)
 val Rp:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rp"))
 val Rc:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rc"))
 val RpSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RpSv"))
 val RcSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RcSv"))

I sent Rp and RpSV to the following function calculateEntropy:

def calculateEntropy(Rx: RDD[(String, Int)], RxSv: RDD[(String, Int)]): Map[Int, Map[String, Double]] = {
        RxSv.foreach{item => {
               val string = item._1.split(",")
               val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))

        .
        .
    }
}

I have two questions:

1- When I loop operation on RxSv as:

RxSv.foreach{item=> { ... }}

it collects all items of the partitions, but I want to only a partition where i am in. If you said that user map function but I don't change anything on RDD.

So when I run the code on a cluster with 4 workers and a driver the dataset is divided into 4 partitions and each worker runs the code. But for example i use foreach loop as i specified in the code. Driver collects all data from workers.

2- I have encountered with a problem on this code

val t = Rx.filter(x => x._1.split(",")(2).equals(abc(2)))

The error :

org.apache.spark.SparkException: This RDD lacks a SparkContext.


It could happen in the following cases:

(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations;
for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

回答1:

First of all, I'd highly recommend caching the first RDD using cache operator.

RSet.cache

That will avoid scanning and transforming your dataset every time you filter for the other RDDs: Rp, Rc, RpSv and RcSv.

Quoting the scaladoc of cache:

cache() Persist this RDD with the default storage level (MEMORY_ONLY).

Performance should increase.


Secondly, I'd be very careful using the term "partition" to refer to a filtered RDD since the term has a special meaning in Spark.

Partitions say how many tasks Spark executes for an action. They are hints for Spark so you, a Spark developer, could fine-tune your distributed pipeline.

The pipeline is distributed across cluster nodes with one or many Spark executors per the partitioning scheme. If you decide to have a one partition in a RDD, once you execute an action on that RDD, you'll have one task on one executor.

The filter transformation does not change the number of partitions (in other words, it preserves partitioning). The number of partitions, i.e. the number of tasks, is exactly the number of partitions of RSet.


1- When I loop operation on RxSv it collects all items of the partitions, but I want to only a partition where i am in

You are. Don't worry about it as Spark will execute the task on executors where the data lives. foreach is an action that does not collect items but describes a computation that runs on executors with the data distributed across the cluster (as partitions).

If you want to process all items at once per partition use foreachPartition:

foreachPartition Applies a function f to each partition of this RDD.


2- I have encountered with a problem on this code

In the following lines of the code:

    RxSv.foreach{item => {
           val string = item._1.split(",")
           val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))

you are executing foreach action that in turn uses Rx which is RDD[(String, Int)]. This is not allowed (and if it were possible should not have been compiled).

The reason for the behaviour is that an RDD is a data structure that just describes what happens with the dataset when an action is executed and lives on the driver (the orchestrator). The driver uses the data structure to track the data sources, transformations and the number of partitions.

A RDD as an entity is gone (= disappears) when the driver spawns tasks on executors.

And when the tasks run nothing is available to help them to know how to run RDDs that are part of their work. And hence the error. Spark is very cautious about it and checks such anomalies before they could cause issues after tasks are executed.