Running MapReduce on Hbase Exported Table thorws C

2019-08-16 06:33发布

问题:

I have taken the Hbase table backup using Hbase Export utility tool .

hbase org.apache.hadoop.hbase.mapreduce.Export "FinancialLineItem" "/project/fricadev/ESGTRF/EXPORT"

This has kicked in mapreduce and transferred all my table data into Output folder . As per the document the file format will of the ouotput file is sequence file . So i ran below code to extract my key and value from the file .

Now i want to run mapreduce to read the key value from the output file but getting below exception

java.lang.Exception: java.io.IOException: Could not find a deserializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization. at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406) Caused by: java.io.IOException: Could not find a deserializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization. at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1964) at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1811) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1760) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1774) at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.initialize(SequenceFileRecordReader.java:50) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)

Here is my driver code

package SEQ;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SeqDriver extends Configured implements Tool 
{
    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new SeqDriver(), args);
        System.exit(exitCode);
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s needs two arguments   files\n",
                    getClass().getSimpleName());
            return -1;
        }
        String outputPath = args[1];

        FileSystem hfs = FileSystem.get(getConf());
        Job job = new Job();
        job.setJarByClass(SeqDriver.class);
        job.setJobName("SequenceFileReader");

        HDFSUtil.removeHdfsSubDirIfExists(hfs, new Path(outputPath), true);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Result.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);

        job.setMapperClass(MySeqMapper.class);

        job.setNumReduceTasks(0);


        int returnValue = job.waitForCompletion(true) ? 0:1;

        if(job.isSuccessful()) {
            System.out.println("Job was successful");
        } else if(!job.isSuccessful()) {
            System.out.println("Job was not successful");           
        }

        return returnValue;
    }
}

Here is my mapper code

package SEQ;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MySeqMapper extends Mapper <ImmutableBytesWritable, Result, Text, Text>{

    @Override
    public void map(ImmutableBytesWritable row, Result value,Context context)
    throws IOException, InterruptedException {
    }
  }

回答1:

So i will answer my question here is what was needed to make it work

Because we use HBase to store our data and this reducer outputs its result to HBase table, Hadoop is telling us that he doesn’t know how to serialize our data. That is why we need to help it. Inside setUp set the io.serializations variable

hbaseConf.setStrings("io.serializations", new String[]{hbaseConf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});