I am looking for a solution to be able to log additional data when executing code on Apache Spark Nodes that could help investigate later some issues that might appear during execution. Trying to use a traditional solution like for example com.typesafe.scalalogging.LazyLogging
fails because the log instance cannot be serialized on a distributed environment like Apache Spark.
I've investigated this problem and for now the solution that I found was to use the org.apache.spark.Logging
trait like this :
class SparkExample with Logging {
val someRDD = ...
someRDD.map {
rddElement => logInfo(s"$rddElement will be processed.")
doSomething(rddElement)
}
}
However it looks like the Logging trait is not a permanent solution for Apache Spark because it's marked as @DeveloperApi
and the class documentation mentions:
This will likely be changed or removed in future releases.
I am wondering - are they any known logging solution that I can use and will allow me to log data when the RDDs are executed on Apache Spark nodes ?
@Later Edit : Some of the comments from below suggest to use Log4J. I've tried using Log4J but I'm still having issues when using logger from a Scala class (and not a Scala object). Here is my full code :
import org.apache.log4j.Logger
import org.apache.spark._
object Main {
def main(args: Array[String]) {
new LoggingTestWithRDD().doTest()
}
}
class LoggingTestWithRDD extends Serializable {
val log = Logger.getLogger(getClass.getName)
def doTest(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
val spark = new SparkContext(conf)
val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map {
element =>
log.info(s"$element will be processed")
element + 1
}
spark.stop()
}
}
The exception that I'm seeing is :
Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger
If you need some code to be executed before and after a
map
,filter
or otherRDD
function, try to usemapPartition
, where the underlying iterator is passed explicitely.Example:
Becomes:
Every basic
RDD
function is always implemented with amapPartition
.Make sure to handle the partitioner explicitly and not to loose it: see Scaladoc,
preservesPartitioning
parameter, this is critical for performances.This is an old post but I want to provide my working solution which I just got after struggling a lot and still can be useful for others:
I want to print rdd contents inside rdd.map function but getting
Task Not Serializalable Error
. This is my solution for this problem using scala static object which is extendingjava.io.Serializable
:Use Log4j 2.x. The core logger has been made serializable. Problem solved.
Jira discussion: https://issues.apache.org/jira/browse/LOG4J2-801
You can use "log" to write logs . Also if you need change logger properties you need to have log4j.properties in /conf folder. By default we will have a template in that location.
Here is my solution :
I am using SLF4j (with Log4j binding), in my base class of every spark job I have something like this:
Just before the place where I use
LOG
in distributed functional code, I copy logger reference to a local constant.It worked for me!
You can use Akhil's solution proposed in
https://www.mail-archive.com/user@spark.apache.org/msg29010.html. I have used by myself and it works.