How to read from hbase using spark

2019-01-05 22:38发布

The below code will read from the hbase, then convert it to json structure and the convert to schemaRDD , But the problem is that I am using List to store the json string then pass to javaRDD, for data of about 100 GB the master will be loaded with data in memory. What is the right way to load the data from hbase then perform manipulation,then convert to JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}

标签:
0条回答
登录 后发表回答