How does lineage get passed down in RDDs in Apache

2019-05-20 18:55发布

Do each RDD point to the same lineage graph

or

when a parent RDD gives its lineage to a new RDD, is the lineage graph copied by the child as well so both the parent and child have different graphs. In this case isn't it memory intensive?

2条回答
Anthone
2楼-- · 2019-05-20 19:06

When a transformation(map or filter etc) is called, it is not executed by Spark immediately, instead a lineage is created for each transformation. A lineage will keep track of what all transformations has to be applied on that RDD, including the location from where it has to read the data.

For example, consider the following example

val myRdd = sc.textFile("spam.txt")
val filteredRdd = myRdd.filter(line => line.contains("wonder"))
filteredRdd.count()

sc.textFile() and myRdd.filter() do not get executed immediately, it will be executed only when an Action is called on the RDD - here filteredRdd.count().

An Action is used to either save result to some location or to display it. RDD lineage information can also be printed by using the command filteredRdd.toDebugString(filteredRdd is the RDD here). Also, DAG Visualization shows the complete graph in a very intuitive manner as follows:

enter image description here

查看更多
狗以群分
3楼-- · 2019-05-20 19:28

Each RDD maintains a pointer to one or more parent along with the metadata about what type of relationship it has with the parent. For example, when we call val b = a.map() on a RDD, the RDD b just keeps a reference (and never copies) to its parent a, that's a lineage.

And when the driver submits the job, the RDD graph is serialized to the worker nodes so that each of the worker nodes apply the series of transformations (like, map filter and etc..) on different partitions. Also, this RDD lineage will be used to recompute the data if some failure occurs.

To display the lineage of an RDD, Spark provides a debug method toDebugString() method.

Consider the following example,

val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
                    .map(words => (words(0), 1))
                    .reduceByKey{(a,b) => a + b}

Executing toDebugString() on splitedLines RDD, will output the following,

(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[5] at map at <console>:24 []
    |  MapPartitionsRDD[4] at map at <console>:23 []
    |  log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
    |  log.txt HadoopRDD[0] at textFile at <console>:21 []

For more information about how Spark works internally, please read my another post

查看更多
登录 后发表回答