Apache Spark logging within Scala

2019-01-21 16:19发布

I am looking for a solution to be able to log additional data when executing code on Apache Spark Nodes that could help investigate later some issues that might appear during execution. Trying to use a traditional solution like for example com.typesafe.scalalogging.LazyLogging fails because the log instance cannot be serialized on a distributed environment like Apache Spark.

I've investigated this problem and for now the solution that I found was to use the org.apache.spark.Logging trait like this :

class SparkExample with Logging {
  val someRDD = ...
  someRDD.map {
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  }
}

However it looks like the Logging trait is not a permanent solution for Apache Spark because it's marked as @DeveloperApi and the class documentation mentions:

This will likely be changed or removed in future releases.

I am wondering - are they any known logging solution that I can use and will allow me to log data when the RDDs are executed on Apache Spark nodes ?

@Later Edit : Some of the comments from below suggest to use Log4J. I've tried using Log4J but I'm still having issues when using logger from a Scala class (and not a Scala object). Here is my full code :

import org.apache.log4j.Logger
import org.apache.spark._

object Main {
 def main(args: Array[String]) {
  new LoggingTestWithRDD().doTest()
 }
}

class LoggingTestWithRDD extends Serializable {

  val log = Logger.getLogger(getClass.getName)

  def doTest(): Unit = {
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
     element =>
       log.info(s"$element will be processed")
       element + 1
    }
   spark.stop()
 }

}

The exception that I'm seeing is :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger

6条回答
我命由我不由天
2楼-- · 2019-01-21 16:38

If you need some code to be executed before and after a map, filter or other RDD function, try to use mapPartition, where the underlying iterator is passed explicitely.

Example:

val log = ??? // this gets captured and produced serialization error
rdd.map { x =>
  log.info(x)
  x+1
}

Becomes:

rdd.mapPartition { it =>
  val log = ??? // this is freshly initialized in worker nodes
  it.map { x =>
    log.info(x)
    x + 1
  }
}

Every basic RDD function is always implemented with a mapPartition.

Make sure to handle the partitioner explicitly and not to loose it: see Scaladoc, preservesPartitioning parameter, this is critical for performances.

查看更多
相关推荐>>
3楼-- · 2019-01-21 16:38

This is an old post but I want to provide my working solution which I just got after struggling a lot and still can be useful for others:

I want to print rdd contents inside rdd.map function but getting Task Not Serializalable Error. This is my solution for this problem using scala static object which is extending java.io.Serializable:

import org.apache.log4j.Level

object MyClass extends Serializable{

val log = org.apache.log4j.LogManager.getLogger("name of my spark log")

log.setLevel(Level.INFO)

def main(args:Array[String])
{

rdd.map(t=>

//Using object's logger here

val log =MyClass.log

log.INFO("count"+rdd.count)
)
}

}
查看更多
男人必须洒脱
4楼-- · 2019-01-21 16:43

Use Log4j 2.x. The core logger has been made serializable. Problem solved.

Jira discussion: https://issues.apache.org/jira/browse/LOG4J2-801

"org.apache.logging.log4j" % "log4j-api" % "2.x.x"

"org.apache.logging.log4j" % "log4j-core" % "2.x.x"

"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
查看更多
女痞
5楼-- · 2019-01-21 16:48
val log = Logger.getLogger(getClass.getName),

You can use "log" to write logs . Also if you need change logger properties you need to have log4j.properties in /conf folder. By default we will have a template in that location.

查看更多
何必那么认真
6楼-- · 2019-01-21 16:51

Here is my solution :

I am using SLF4j (with Log4j binding), in my base class of every spark job I have something like this:

import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass) 

Just before the place where I use LOG in distributed functional code, I copy logger reference to a local constant.

val LOG = this.LOG

It worked for me!

查看更多
Explosion°爆炸
7楼-- · 2019-01-21 16:58

You can use Akhil's solution proposed in
https://www.mail-archive.com/user@spark.apache.org/msg29010.html. I have used by myself and it works.

Akhil Das Mon, 25 May 2015 08:20:40 -0700
Try this way:

object Holder extends Serializable {      
   @transient lazy val log = Logger.getLogger(getClass.getName)    
}


val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element =>
   Holder.log.info(element)
}
查看更多
登录 后发表回答