I am trying to understand data locality as it relates to Hadoop's Map/Reduce framework. In particular I am trying to understand what component handles data locality (i.e. is it the input format?)
Yahoo's Developer Network Page states "The Hadoop framework then schedules these processes in proximity to the location of data/records using knowledge from the distributed file system." This seems to imply that the HDFS input format will perhaps query the name node to determine which nodes contain the desired data and will start the map tasks on those nodes if possible. One could imagine a similar approach could be taken with HBase by querying to determine which regions are serving certain records.
If a developer writes their own input format would they be responsible for implementing data locality?
You're right. If you're looking at the
FileInputFormat
class and thegetSplits()
method. It searches for the Blocklocations:BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
This implies the FileSystem query. This happens inside the
JobClient
, the results getting written into a SequenceFile (actually it's just raw byte code). So the Jobtracker reads this file later on while initializing the job and is pretty much just assigning a task to an inputsplit.BUT the distribution of the data is the NameNodes job.
To your question now: Normally you are extending from the
FileInputFormat
. So you will be forced to return a list ofInputSplit
, and in the initialization step it is required for such a thing to set the location of the split. For example theFileSplit
:So actually you don't implement the data locality itself, you are just telling on which host the split can be found. This is easily queryable with the
FileSystem
interface.Mu understanding is that data locality is jointly determined by HDFS and the InputFormat. The former determines (via rack awareness) and stores the location of HDFS blocks across datanodes while the latter will determine which blocks are associated with which split. The jobtracker will try to optimize which splits are delivered to which map task by making sure that the blocks associated for each split (1 split to 1 map task mapping) are local to the tasktracker.
Unfortunately, this approach to guaranteeing locality is preserved in homogeneous clusters but would break down in inhomogeneous ones i.e. ones where there are different sizes of hard disks per datanode. If you want to dig deeper on this you should read this paper (Improving MapReduce performance through data placement in heterogeneous hadoop clusters) that also touches on several topics relative to your question.