how to implement the hbase secure bulk load

2019-03-05 03:07发布

I already created a bulk load in hbase in a kerberos cluster with a driver class similar to this (working):

public static void main(String[] args) {        
    try {
        int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);            
        if(response == 0) {             
            System.out.println("Job is successfully completed...");
        } else {
            System.out.println("Job failed...");
        }
    } catch(Exception exception) {
        exception.printStackTrace();
    }
}

@Override
public int run(String[] args) throws Exception {
    int result=0;

    final String inputPath = args[0];   
    final String outputPath = args[1];      
    final String keytab = args[2];  

    Configuration configuration = getConf();        


    configuration.set("data.seperator", DATA_SEPERATOR);        
    configuration.set("hbase.table.name",TABLE_NAME);
   // configuration.set("INTRO",COLUMN_FAMILY_INTRO);
    configuration.set("hbase.zookeeper.quorum","zk_quorum");
    configuration.set("hbase.zookeeper.property.clientPort","2181");
    configuration.set("hbase.master","master:port");
    configuration.set("hadoop.security.authentication", "Kerberos");
    configuration.set("hbase.security.authentication", "kerberos");

        //configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);     
    Job job = new Job(configuration);       
    // job configuration
    job.setJarByClass(HBaseBulkLoadDriver.class);       
    job.setJobName("Bulk Loading HBase Table:"+TABLE_NAME);     
    job.setInputFormatClass(TextInputFormat.class);     
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    //mapper class
    job.setMapperClass(HBaseBulkLoadMapper.class);      
    FileInputFormat.addInputPaths(job,inputPath);   
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);      
    FileOutputFormat.setOutputPath(job, new Path(outputPath));      
    job.setMapOutputValueClass(Put.class);      
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));  

    job.waitForCompletion(true);         

    System.out.println("Output written to folder :" + outputPath);

    System.out.println("To proceed loading files user: hbase:hbase must own recursivly the folder!");

    System.out.println("Is hbase user owing the folder?press Y to load the data , press N and job will fail");

    String IsHbaseOwnerOftheFolder = System.console().readLine();

    if (job.isSuccessful() && IsHbaseOwnerOftheFolder.equals("Y")) {
        HBaseBulkLoad.doBulkLoad(outputPath, keytab, TABLE_NAME);
    } else {
        result = -1;
    }
    return result;
}

Now I would like to implement the secure bulk load but seem that this must be implemented using coprocessor framework (hbase 1.0.0) can someone give me a complete example of how to use the securebulkloadHFiles method ? Thanks for the help

标签: hbase
1条回答
对你真心纯属浪费
2楼-- · 2019-03-05 03:39

I will answer myself to my question :

For this answer to work a table in hbase must already exist moreover HFile have to be already generated for the import

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;

String keyTab = "pathtokeytabfile";
String tableName = "tb_name";
String pathToHFile = "/tmp/tmpfiles/";
Configuration configuration = new Configuration();  

configuration.set("hbase.zookeeper.quorum","ZK_QUORUM");
configuration.set("hbase.zookeeper"+ ".property.clientPort","2181");
configuration.set("hbase.master","MASTER:60000");
configuration.set("hadoop.security.authentication", "Kerberos");
configuration.set("hbase.security.authentication", "kerberos");


//Obtaining kerberos authentication 

UserGroupInformation.setConfiguration(configuration);

UserGroupInformation.loginUserFromKeytab("here keytab", path to the key tab);

HBaseAdmin.checkHBaseAvailable(configuration);

System.out.println("HBase is running!");

HBaseConfiguration.addHbaseResources(configuration);    

Connection conn = ConnectionFactory.createConnection(configuration);

Table table = conn.getTable(TableName.valueOf(tableName));

HRegionInfo tbInfo = new HRegionInfo(table.getName());


//path to the HFiles that need to be loaded 

Path hfofDir = new Path(pathToHFile);

//acquiring user token for authentication 

UserProvider up = UserProvider.instantiate(configuration);

FsDelegationToken fsDelegationToken = new FsDelegationToken(up, "name of the key tab user");

fsDelegationToken.acquireDelegationToken(hfofDir.getFileSystem(configuration));

//preparing  for the bulk load

SecureBulkLoadClient secureBulkLoadClient = new SecureBulkLoadClient(table);

String bulkToken = secureBulkLoadClient.prepareBulkLoad(table.getName());

System.out.println(bulkToken);

//creating the family list (list of family names and path to the hfile corresponding to the family name)

final List<Pair<byte[], String>> famPaths = new ArrayList<>();

Pair p = new Pair();

//name of the family 
p.setFirst("nameofthefamily".getBytes());

//path to the HFile (HFile are organized in folder with the name of the family)
p.setSecond("/tmp/tmpfiles/INTRO/nameofthefilehere");

famPaths.add(p);

//bulk loading ,using the secure bulk load client

secureBulkLoadClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), bulkToken, tbInfo.getStartKey());

System.out.println("Bulk Load Completed..");    
查看更多
登录 后发表回答