Spark Graphx: Time Cost increases stably per round

2019-09-15 10:06发布

问题:

I use graphx api in a iterative alogrithm. Although I have carefully cache/ unpersist rdd, and take care of the vertices partition num. The time cost still seems increases per round in a lineary trend. The simplified version of my code as following, and it gets the same problem:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object ComputingTimeProblem extends App {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[1]").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    var graph = GraphGenerators
        .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
        .cache
    graph.vertices.take(10).foreach(println)

    val maxIter = 50

    var preGraph: Graph[Double, Int] = null
    var allTime: ArrayBuffer[Double] = ArrayBuffer()
    for (i <- 1 to maxIter) {

        val begin = System.currentTimeMillis()

        preGraph = graph

        val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
        graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
        graph.vertices.take(10)

        preGraph.unpersist()

        val end = System.currentTimeMillis()

        val duration = (end - begin) / (60 * 1000d)
        allTime += duration
        println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
            duration, graph.vertices.getNumPartitions))
    }

    graph.vertices.take(10).foreach(println)

    val avgTime = allTime.sum / allTime.size
    println(s"Average Time = ${avgTime}")

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
    timeCostDiffs
        .zipWithIndex
        .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1))
        .foreach(println)

    println("tc\n"+allTime.mkString("\n"))
}

Time cost trend as following

I have not changed the index of the graph object, and the graphx would join the vertices by leftZipJoin method, which not requiring shuffling, so why the time cost still increase per round. Can anybody give some constructive options, thanks?!

回答1:

It still an lineage problem, I have just found. Graph object has two rdd: vertex rdd and edge rdd. In the above code, I have just materialized vertex rdd, not edge rdd. So, each round, it will recompute the previous edges agagin. So, materializing both rdd with triplets object will solve the problem, as following:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object ComputingTimeProblem extends App {

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[1]").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    var graph = GraphGenerators
        .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
        //        .partitionBy(PartitionStrategy.RandomVertexCut,8)
        .cache
    graph.vertices.take(10).foreach(println)

    val maxIter = 50

    var preGraph: Graph[Double, Int] = null
    var allTime: ArrayBuffer[Double] = ArrayBuffer()
    for (i <- 1 to maxIter) {
        val begin = System.currentTimeMillis()

        preGraph = graph

        val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
        graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
        graph.triplets.take(10) // here materialize both vertex and edge rdd
        // graph.vertices.take(10)

        preGraph.unpersist()

        val end = System.currentTimeMillis()

        val duration = (end - begin) / (60 * 1000d)
        allTime += duration
        println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
            duration, graph.vertices.getNumPartitions))
    }

    graph.vertices.take(10).foreach(println)

    val avgTime = allTime.sum / allTime.size
    println(s"Average Time = ${avgTime}")

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
    timeCostDiffs
        .zipWithIndex
        .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2 + 1, x._2 + 2, x._1))
        .foreach(println)


    println("tc\n" + allTime.mkString("\n"))

}