Recursive method call in Apache Spark

2020-06-23 07:24发布

问题:

I'm building a family tree from a database on Apache Spark, using a recursive search to find the ultimate parent (i.e. the person at the top of the family tree) for each person in the DB.

It is assumed that the first person returned when searching for their id is the correct parent

val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personSeq = peopleById.lookup(personId)
    val person = personSeq(0)
    if(person.personId == "0 "|| person.id == person.parentId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

It is giving the following error

"Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063."

I understand from reading other similar questions that the problem is that I'm calling the findUltimateParentId from within the foreach loop, and if I call the method from the shell with a person's id, it returns the correct ultimate parent id

However, none of the other suggested solutions work for me, or at least I can't see how to implement them in my program, can anyone help?

回答1:

If I understood you correctly - here's a solution that would work for any size of input (although performance might not be great) - it performs N iterations over the RDD where N is the "deepest family" (largest distance from ancestor to child) in the input:

// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])

// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
  def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}

object RecursiveParentLookup {
  // requested method
  def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {

    // all persons keyed by id
    def byId = rdd.keyBy(_.id).cache()

    // recursive function that "climbs" one generation at each iteration
    def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
      val cached = persons.cache()
      // find which persons can climb further up family tree
      val haveGrandparents = cached.filter(_.hasGrandparent)

      if (haveGrandparents.isEmpty()) {
        cached // we're done, return result
      } else {
        val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
        // for those who can - join with persons to find the grandparent and attach it instead of parent
        val withGrandparents = haveGrandparents
          .keyBy(_.ancestor.get.parentId.get) // grandparent id
          .join(byId)
          .values
          .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
        // call this method recursively on the result
        done ++ climbOneGeneration(withGrandparents)
      }
    }

    // call recursive method - start by assuming each person is its own parent, if it has one:
    climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
  }

}

Here's a test to better understand how this works:

/**
  *     Example input tree:
  *
  *            1             5
  *            |             |
  *      ----- 2 -----       6
  *      |           |
  *      3           4
  *
  */

val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))

test("find ultimate parent") {
  val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
  val result = RecursiveParentLookup.findUltimateParent(input).collect()
  result should contain theSameElementsAs Seq(
    WithAncestor(person1, None),
    WithAncestor(person2, Some(person1)),
    WithAncestor(person3, Some(person1)),
    WithAncestor(person4, Some(person1)),
    WithAncestor(person5, None),
    WithAncestor(person6, Some(person5))
  )
}

It should be easy to map your input into these Person objects, and to map the output WithAncestor objects into whatever it is you need. Note that this code assumes that if any person has parentId X - another person with that id actually exists in the input



回答2:

fixed this by using SparkContext.broadcast:

val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())

def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personOption = broadcastedPeople.value.get(personId)
    if(personOption.isEmpty) {

        return "0";

    }
    val person = personOption.get
    if(person.personId == 0 || person.orgId == person.personId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

working great now!