Problem Solved Eventually check my solution in the bottom
Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.
Here is the problem: I have a pair of mapper-reducer
public final class WikipediaToItemPrefsMapper extends
Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
private static final Pattern NUMBERS = Pattern.compile("(\\d+)");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher m = NUMBERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while (m.find()) {
itemID.set(Long.parseLong(m.group()));
context.write(userID, itemID);
}
}
}
public class WikipediaToUserVectorReducer
extends
Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {
public void reduce(VarLongWritable userID,
Iterable<VarLongWritable> itemPrefs, Context context)
throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
userVector.set((int) itemPref.get(), 1.0f);
}
context.write(userID, new VectorWritable(userVector));
}
}
The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}
Then I want to use another pair of mapper-reducer to process this data
public class UserVectorSplitterMapper
extends
Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {
public void map(VarLongWritable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
long userID = key.get();
Vector userVector = value.get();
Iterator<Vector.Element> it = userVector.iterateNonZero();
IntWritable itemIndexWritable = new IntWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue = (float) e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable,
new VectorOrPrefWritable(userID, preferenceValue));
}
}
}
When I try to run the job, it cast error says
org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable
the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?
Problem solved
Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.
Here are some example code:
confFactory ToItemPrefsWorkFlow = new confFactory
(new Path("/dbout"), //input file path
new Path("/mahout/output.txt"), //output file path
TextInputFormat.class, //input format
VarLongWritable.class, //mapper key format
Item_Score_Writable.class, //mapper value format
VarLongWritable.class, //reducer key format
VectorWritable.class, //reducer value format
**SequenceFileOutputFormat.class** //The reducer output format
);
ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
JobConf conf1 = ToItemPrefsWorkFlow.getConf();
confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
(new Path("/mahout/output.txt"),
new Path("/mahout/UserVectorToCooccurrence"),
SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
//UserVectorToCooccurrenceMapper.class,
IntWritable.class,
IntWritable.class,
IntWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class
);
UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
JobClient.runJob(conf1);
JobClient.runJob(conf2);
If you have any problem with this please feel free to contact me