I run my spark application in yarn cluster. In my code I use number available cores of queue for creating partitions on my dataset:
Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
My question: how can I get number available cores of queue by programmatically way and not by configuration?
There are ways to get both the number of executors and the number of cores in a cluster from Spark. Here is a bit of Scala utility code that I've used in the past. You should easily be able to adapt it to Java. There are two key ideas:
The number of workers is the number of executors minus one or sc.getExecutorStorageStatus.length - 1
.
The number of cores per worker can be obtained by executing java.lang.Runtime.getRuntime.availableProcessors
on a worker.
The rest of the code is boilerplate for adding convenience methods to SparkContext
using Scala implicits. I wrote the code for 1.x years ago, which is why it is not using SparkSession
.
One final point: it is often a good idea to coalesce to a multiple of your cores as this can improve performance in the case of skewed data. In practice, I use anywhere between 1.5x and 4x, depending on the size of data and whether the job is running on a shared cluster or not.
import org.apache.spark.SparkContext
import scala.language.implicitConversions
class RichSparkContext(val sc: SparkContext) {
def executorCount: Int =
sc.getExecutorStorageStatus.length - 1 // one is the driver
def coresPerExecutor: Int =
RichSparkContext.coresPerExecutor(sc)
def coreCount: Int =
executorCount * coresPerExecutor
def coreCount(coresPerExecutor: Int): Int =
executorCount * coresPerExecutor
}
object RichSparkContext {
trait Enrichment {
implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
new RichSparkContext(sc)
}
object implicits extends Enrichment
private var _coresPerExecutor: Int = 0
def coresPerExecutor(sc: SparkContext): Int =
synchronized {
if (_coresPerExecutor == 0)
sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
else _coresPerExecutor
}
}