Hadoop - How to extract a taskId from mapred.JobCo

2019-09-10 23:07发布

问题:

Is it possible to create a valid *mapreduce*.TaskAttemptID from *mapred*.JobConf?

The background

I need to write a FileInputFormatAdapter for an ExistingFileInputFormat. The problem is that the Adapter needs to extend mapred.InputFormat and the Existing format extends mapreduce.InputFormat.

I need to build a mapreduce.TaskAttemptContextImpl, so that I can instantiate the ExistingRecordReader. However, I can't create a valid TaskId...the taskId comes out as null.

So How can I get the taskId, jobId, etc from mapred.JobConf.

In particular in the Adapter's getRecordReader I need to do something like:

public org.apache.hadoop.mapred.RecordReader<NullWritable, MyWritable> getRecordReader(
        org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) throws IOException {

    SplitAdapter splitAdapter = (SplitAdapter) split;

    final Configuration conf = job;

    /*************************************************/
    //The problem is here, "mapred.task.id" is not in the conf
    /*************************************************/
    final TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));

    final TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
    try {
        return new RecordReaderAdapter(new ExistingRecordReader(
                splitAdapter.getMapRedeuceSplit(),
                context));
    } catch (InterruptedException e) {
        throw new RuntimeException("Failed to create record-reader.", e);
    }
}

This code throws an exception:

Caused by: java.lang.NullPointerException
    at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:44)
    at org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.<init>(TaskAttemptContextImpl.java:39)

'super(conf, taskId.getJobID());' is throwing the exception, most likely because taskId is null.

回答1:

I found the answer by looking through HiveHbaseTableInputFormat. Since my solution is targeted for hive, this works perfectly.

 TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
        job.getConfiguration(), reporter);