I have a dataset (as an RDD
) that I divide into 4 RDDs by using different filter
operators.
val RSet = datasetRdd.
flatMap(x => RSetForAttr(x, alLevel, hieDict)).
map(x => (x, 1)).
reduceByKey((x, y) => x + y)
val Rp:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rp"))
val Rc:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rc"))
val RpSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RpSv"))
val RcSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RcSv"))
I sent Rp
and RpSV
to the following function calculateEntropy
:
def calculateEntropy(Rx: RDD[(String, Int)], RxSv: RDD[(String, Int)]): Map[Int, Map[String, Double]] = {
RxSv.foreach{item => {
val string = item._1.split(",")
val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))
.
.
}
}
I have two questions:
1- When I loop operation on RxSv
as:
RxSv.foreach{item=> { ... }}
it collects all items of the partitions, but I want to only a partition where i am in. If you said that user map
function but I don't change anything on RDD
.
So when I run the code on a cluster with 4 workers and a driver the dataset is divided into 4 partitions and each worker runs the code. But for example i use foreach loop as i specified in the code. Driver collects all data from workers.
2- I have encountered with a problem on this code
val t = Rx.filter(x => x._1.split(",")(2).equals(abc(2)))
The error :
org.apache.spark.SparkException: This RDD lacks a SparkContext.
It could happen in the following cases:
(1) RDD
transformations
and actions
are NOT invoked by the driver, but inside of other transformations;
for example, rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation
and count
action
cannot be performed inside of the rdd1.map
transformation
. For more information, see SPARK-5063.
(2) When a Spark Streaming
job recovers from checkpoint, this exception will be hit if a reference to an RDD
not defined by the streaming job is used in DStream
operations. For more information, See SPARK-13758.
First of all, I'd highly recommend caching the first RDD using cache
operator.
RSet.cache
That will avoid scanning and transforming your dataset every time you filter
for the other RDDs: Rp
, Rc
, RpSv
and RcSv
.
Quoting the scaladoc of cache:
cache() Persist this RDD with the default storage level (MEMORY_ONLY).
Performance should increase.
Secondly, I'd be very careful using the term "partition" to refer to a filtered RDD since the term has a special meaning in Spark.
Partitions say how many tasks Spark executes for an action. They are hints for Spark so you, a Spark developer, could fine-tune your distributed pipeline.
The pipeline is distributed across cluster nodes with one or many Spark executors per the partitioning scheme. If you decide to have a one partition in a RDD, once you execute an action on that RDD, you'll have one task on one executor.
The filter
transformation does not change the number of partitions (in other words, it preserves partitioning). The number of partitions, i.e. the number of tasks, is exactly the number of partitions of RSet
.
1- When I loop operation on RxSv
it collects all items of the partitions, but I want to only a partition where i am in
You are. Don't worry about it as Spark will execute the task on executors where the data lives. foreach
is an action that does not collect items but describes a computation that runs on executors with the data distributed across the cluster (as partitions).
If you want to process all items at once per partition use foreachPartition:
foreachPartition Applies a function f to each partition of this RDD.
2- I have encountered with a problem on this code
In the following lines of the code:
RxSv.foreach{item => {
val string = item._1.split(",")
val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))
you are executing foreach
action that in turn uses Rx
which is RDD[(String, Int)]
. This is not allowed (and if it were possible should not have been compiled).
The reason for the behaviour is that an RDD is a data structure that just describes what happens with the dataset when an action is executed and lives on the driver (the orchestrator). The driver uses the data structure to track the data sources, transformations and the number of partitions.
A RDD as an entity is gone (= disappears) when the driver spawns tasks on executors.
And when the tasks run nothing is available to help them to know how to run RDDs that are part of their work. And hence the error. Spark is very cautious about it and checks such anomalies before they could cause issues after tasks are executed.