What's the meaning of “Locality Level”on Spark

2020-02-02 05:45发布

What's the meaning of the title "Locality Level" and the 5 status Data local --> process local --> node local --> rack local --> Any?

enter image description here

2条回答
家丑人穷心不美
2楼-- · 2020-02-02 06:29

The locality level as far as I know indicates which type of access to data has been performed. When a node finishes all its work and its CPU become idle, Spark may decide to start other pending task that require obtaining data from other places. So ideally, all your tasks should be process local as it is associated with lower data access latency.

You can configure the wait time before moving to other locality levels using:

spark.locality.wait

More information about the parameters can be found in the Spark Configuration docs

With respect to the different levels PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, or ANY I think the methods findTask and findSpeculativeTask in org.apache.spark.scheduler.TaskSetManager illustrate how Spark chooses tasks based on their locality level. It first will check for PROCESS_LOCAL tasks which are going to be launched in the same executor process. If not, it will check for NODE_LOCAL tasks that may be in other executors in the same node or it need to be retrieved from systems like HDFS, cached, etc. RACK_LOCAL means that data is in another node and therefore it need to be transferred prior execution. And finally, ANY is just to take any pending task that may run in the current node.

  /**
   * Dequeue a pending task for a given node and return its index and locality level.
   * Only search for tasks matching the given locality constraint.
   */
  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
      for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }

    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }

    // Finally, if all else has failed, find a speculative task
    findSpeculativeTask(execId, host, locality)
  }
查看更多
兄弟一词,经得起流年.
3楼-- · 2020-02-02 06:40

Here are my two cents and I summarized mostly from spark official guide.

Firstly, I want to add one more locality level which is NO_PREF which has been discussed at this thread.
Then, let's put those levels together into a single table,

enter image description here

It's noted that specific level can be skipped as per guide from spark configuration.

For instance, if you want to skip NODE_LOCAL, just set spark.locality.wait.node to 0.

查看更多
登录 后发表回答