我写了下面的程序
using namespace std;
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
#include "libpq-fe.h"
extern "C" {
#include "traverser.h"
}
class IndexMap:public HadoopPipes::Mapper {
public:
IndexMap(HadoopPipes::TaskContext & context) { }
void map(HadoopPipes::MapContext & context) {
std::vector<std::string> paths = HadoopUtils::splitString(context.getInputValue(), "rn");
unsigned int k = 4;
unsigned int l = 0;
string concatpaths[k];
if (paths.size() % k == 0) {
for (unsigned int i = 0; i < k; ++i) {
concatpaths[i] = paths[l];
l = l + paths.size() / k;
}
for (unsigned int i = 0; i < k; ++i) {
for (unsigned int j = 1; j < paths.size() / k; ++j) {
concatpaths[i] = +" " + paths[i * paths.size() / k + j];
}
}
} else {
l = 0;
for (unsigned int i = 0; i < k; ++i) {
concatpaths[i] = paths[l];
l = l + paths.size() / (k - 1);
}
for (unsigned int i = 0; i < k - 1; ++i) {
for (unsigned int j = 1; j < paths.size() / (k - 1); ++j) {
concatpaths[i] = +" " + paths[i * paths.size() / (k - 1) + j];
}
}
for (unsigned int j = 1; j < paths.size() - paths.size() / (k - 1) * (k - 1); ++j) {
concatpaths[k - 1] = +" " + paths[(k - 1) * paths.size() / (k - 1) + j];
}
for (unsigned int i = 0; i < k; ++i) {
context.emit(concatpaths[i], "0");
}
}
}
};
class IndexReduce:public HadoopPipes::Reducer {
public:
IndexReduce(HadoopPipes::TaskContext & context) { }
void reduce(HadoopPipes::ReduceContext & context) {
long int count = 0;
long int result = 0;
std::vector<std::string> processedpaths = HadoopUtils::splitString(context.getInputValue(), " ");
result = Configure("/export/hadoop-1.0.1/src/c++/hadoopc++/src/nsindexer.conf");
for (unsigned int i = 0; i < processedpaths.size(); ++i) {
count = Traverser(processedpaths[i].c_str());
}
context.emit(processedpaths[processedpaths.size() - 1], HadoopUtils::toString(count));
}
};
int main(int argc, char *argv[])
{
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<IndexMap, IndexReduce> ());
}
我开始它
root@one: /export/hadoop-1.0.1/bin# ./hadoop pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -input paths.txt -output out.txt -program bin/parindex
根据该文献 。
我得到了以下错误
12/08/29 08:02:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 12/08/29 08:02:10 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 12/08/29 08:02:10 INFO mapred.JobClient: Cleaning up the staging area file:/tmp/hadoop-root/mapred/staging/root-2093574148/.staging/job_local_0001 12/08/29 08:02:10 ERROR security.UserGroupInformation: PriviledgedActionException as:root cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/export/hadoop-1.0.1/bin/out.txt already exists Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/export/hadoop-1.0.1/bin/out.txt already exists at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs (FileOutputFormat.java:121) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:891) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850) at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824) at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261) at org.apache.hadoop.mapred.pipes.Submitter.runJob(Submitter.java:248) at org.apache.hadoop.mapred.pipes.Submitter.run(Submitter.java:479) at org.apache.hadoop.mapred.pipes.Submitter.main(Submitter.java:494)