How to directly send the output of a mapper-reduce

2020-05-28 04:29发布

问题:

Problem Solved Eventually check my solution in the bottom


Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.

Here is the problem: I have a pair of mapper-reducer

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

Then I want to use another pair of mapper-reducer to process this data

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

When I try to run the job, it cast error says

org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable

the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?


Problem solved

Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.

Here are some example code:

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             

    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();


    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

If you have any problem with this please feel free to contact me

回答1:

You need to explicitly configure the output of the first job to use the SequenceFileOutputFormat and define the output key and value classes:

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

Without seeing your driver code, i'm guessing you're using TextOutputFormat as the output, of the first job, and TextInputFormat as the input to the second - and this input format sends pairs of <Text, Text> to the second mapper



回答2:

I am a beginner in hadoop, it's just my guess of the answer, so please bear with it/point out if it seems to be naive.

I think it's not reasonable to send from reducer to next mapper without saving on HDFS. Because "which split of data go to which mapper" is elegantly designed to meet the locality criteria.(goes to mapper node which have data stored locally).

If you don't store it on HDFS, most likely that all the data will be transmitted by network which is slow and may cause bandwidth problem.



回答3:

You have to temporarily save output of first map-reduce so that the 2nd one can use it.

This might help you to understand how the output of first map-reduce is passed to 2nd one. (this is based on the Generator.java of Apache nutch).

This is the temporary dir for the output of first map-reduce:

Path tempDir =
  new Path(getConf().get("mapred.temp.dir", ".")
           + "/job1-temp-"
           + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

Setting up first map-reduce job:

JobConf job1 = getConf();
job1.setJobName("job 1");
FileInputFormat.addInputPath(...);
sortJob.setMapperClass(...);

FileOutputFormat.setOutputPath(job1, tempDir);
job1.setOutputFormat(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(...);
JobClient.runJob(job1);

Observe that the output dir is set in the job configuration. Use this in the 2nd job:

JobConf job2 = getConf();
FileInputFormat.addInputPath(job2, tempDir);
job2.setReducerClass(...);
JobClient.runJob(job2);

Remember to clean-up the temp dirs after your are done:

// clean up
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);

Hope this helps.



标签: hadoop mahout