I have a weka model stored in S3 which is of size around 400MB. Now, I have some set of record on which I want to run the model and perform prediction.
For performing prediction, What I have tried is,
Download and load the model on driver as a static object , broadcast it to all executors. Perform a map operation on prediction RDD. ----> Not working, as in Weka for performing prediction, model object needs to be modified and broadcast require a read-only copy.
Download and load the model on driver as a static object and send it to executor in each map operation. -----> Working (Not efficient, as in each map operation, i am passing 400MB object)
Download the model on driver and load it on each executor and cache it there. (Don't know how to do that)
Does someone have any idea how can I load the model on each executor once and cache it so that for other records I don't load it again?
Here's what worked for me even better than the lazy initializer. I created an object level pointer initialized to null, and let each executor initialize it. In the initialization block you can have run-once code. Note that each processing batch will reset local variables but not the Object-level ones.
This approach creates exactly one big object per executor, rather than the one big object per partition of other approaches.
If you put the var bigObject : BigObject = null within the main function namespace, it behaves differently. In that case, it runs the bigObject constructor at the beginning of each partition (ie. batch). If you have a memory leak, then this will eventually kill the executor. Garbage collection would also need to do more work.
Here is what we usually do
define a singleton client that do those kind of stuff to ensure only one client is present in each executors
have a getorcreate method to create or fetch the client information, usulaly let's you have a common serving platform you want to serve for multiple different models, then we can use like concurrentmap to ensure threadsafe and computeifabsent
the getorcreate method will be called inside RDD level like transform or foreachpartition, so make sure init happen in executor level
You have two options:
1. Create a singleton object with a lazy val representing the data:
Then, you can use the lazy val in your
map
function. Thelazy val
ensures that each worker JVM initializes their own instance of the data. No serialization or broadcasts will be performed fordata
.Advantages
Disadvantages
2. Use the
mapPartition
(orforeachPartition
) method on the RDD instead of justmap
.This allows you to initialize whatever you need for the entire partition.
Advantages:
Disadvantages