I am wondering if someone can explain how the distributed cache works in Hadoop. I am running a job many times, and after each run I notice that the local distributed cache folder on each node is growing in size.
Is there a way for multiple jobs to re-use the same file in the distributed cache? Or is the distributed cache only valid for the lifetime of any individual job?
The reason I am confused is that the Hadoop documentation mentions that "DistributedCache tracks modification timestamps of the cache files", so this leads me to believe that if the time stamp hasn't changed, then it should not need to re-cache or re-copy the files to the nodes.
I am adding files successfully to the distributed cache using:
DistributedCache.addFileToClassPath(hdfsPath, conf);
If you look closely at what this book says, is that there is a limit of what can be stored in Distributed Cache. By default it's 10GB (configurable). There can be multiple different jobs running in the cluster concurrently. Furthermore, Hadoop kind of guarantees the files stay available in the cache for a single job as it is maintained by reference count done by the tasktracker for different tasks accessing the files in cache. In your case, for subsequent Jobs, the files may not be there as they are already marked for deletion.
Please correct me if you disagree anywhere. I'll be glad to discuss this further.
According to this: http://www.datasalt.com/2011/05/handling-dependencies-and-configuration-in-java-hadoop-projects-efficiently/
You should be able to do this via DistributedCache API instead of "-libjars"
DistributedCache uses reference counting to manage the caches.
org.apache.hadoop.filecache.TrackerDistributedCacheManager.CleanupThread
is in charge of cleaning up the CacheDirs whose reference count is 0. It will check every minute (default period is 1 minute, you can set it by "mapreduce.tasktracker.distributedcache.checkperiod").When a Job finishes or fails, JobTracker will send a
org.apache.hadoop.mapred.KillJobAction
to the TaskTrackers. Then if a TaskTracker receives a KillJobAction, it puts the action to tasksToCleanup. In the TaskTracker, there is a background Thread called taskCleanupThread which takes the action from tasksToCleanup and do the cleanup work. For a KillJobAction, it will invoke purgeJob to clean up the Job. In this method, it will decrease the reference count used by this Job (rjob.distCacheMgr.release();
).The above analysis bases on
hadoop-core-2.0.0-mr1-cdh4.2.1-sources.jar
. I also checked thehadoop-core-0.20.2-cdh3u1-sources.jar
and found there was a litte difference between this two versions. For example, there was not aorg.apache.hadoop.filecache.TrackerDistributedCacheManager.CleanupThread
in0.20.2-cdh3u1
. When initializing a Job, TrackerDistributedCacheManager will check if there is enough space to put the new caches files for this Job. If not, it will delete the caches which have 0 reference count.If you are using cdh4.2.1, you can increase "mapreduce.tasktracker.distributedcache.checkperiod" to let the clean up work delay. Then the probability that multiple Jobs use the same distributed cache is increased.
If you are using cdh3u1, you can increase the limitation of the cache size("local.cache.size", default is 10G) and the max directories for caches("mapreduce.tasktracker.cache.local.numberdirectories", default is 10000). This can be also applied to cdh4.2.1.