We use Redis on Spark to cache our key-value pairs.This is the code:
import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
val arr = x.split(" ")
val readId = arr(0).toInt
val refId = arr(1).toInt
val start = arr(2).toInt
val end = arr(3).toInt
val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
val readStr = r.hmget("readStr", readId).get(readId)
val realend = if(end > refStr.length - 1) refStr.length - 1 else end
val refOneStr = refStr.substring(start, realend)
(readStr, refOneStr, refId, start, realend, readId)
})
But compiler gave me feedback like this:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.ynu.App$.main(App.scala:511)
at com.ynu.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more
Could somebody tell me how to serialize the data get from Redis.Thanks a lot.
You're trying to serialize the client. You have one
RedisClient
,r
, that you're trying to use inside themap
that will be run across different cluster nodes. Either get the data you want out of redis separately before doing a cluster task, or create the client individually for each cluster task inside yourmap
block (perhaps by usingmapPartitions
rather thanmap
, as creating a new redis client for each individual row is probably a bad idea).In Spark, the functions on
RDD
s (likemap
here) are serialized and send to the executors for processing. This implies that all elements contained within those operations should be serializable.The Redis connection here is not serializable as it opens TCP connections to the target DB that are bound to the machine where it's created.
The solution is to create those connections on the executors, in the local execution context. There're few ways to do that. Two that pop to mind are:
rdd.mapPartitions
: lets you process a whole partition at once, and therefore amortize the cost of creating connections)mapPartitions
is easier as all it requires is a small change to the program structure:A singleton connection manager can be modeled with an object that holds a lazy reference to a connection (note: a mutable ref will also work).
This object can then be used to instantiate 1 connection per worker JVM and is used as a
Serializable
object in an operation closure.The advantage of using the singleton object is less overhead as connections are created only once by JVM (as opposed to 1 per RDD partition)
There're also some disadvantages:
(*) code provided for illustration purposes. Not compiled or tested.