PHOENIX SPARK - Load Table as DataFrame

2020-07-13 09:51发布

问题:

I have created a DataFrame from a HBase Table (PHOENIX) which has 500 million rows. From the DataFrame I created an RDD of JavaBean and use it for joining with data from a file.

Map<String, String> phoenixInfoMap = new HashMap<String, String>();
phoenixInfoMap.put("table", tableName);
phoenixInfoMap.put("zkUrl", zkURL);
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load();
JavaRDD<Row> tableRows = df.toJavaRDD();
JavaPairRDD<String, AccountModel> dbData = tableRows.mapToPair(
new PairFunction<Row, String, String>()
{
    @Override
    public Tuple2<String, String> call(Row row) throws Exception
    {
        return new Tuple2<String, String>(row.getAs("ID"), row.getAs("NAME"));
    }
});

Now my question - Lets say the file has 2 unique million entries matching with the table. Is the entire table loaded into memory as RDD or only the matching 2 million records from the table will be loaded into memory as RDD ?

回答1:

Your statement

DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap)
.load();

will load the entire table into memory. You have not provided any filter for phoenix to push down into hbase - and thus reduce the number of rows read.

If you do a join to a non-HBase datasource - e.g a flat file - then all of the records from the hbase table would first need to be read in. The records not matching the secondary data source would not be saved in the new DataFrame - but the initial reading would have still happened.

Update A potential approach would be to pre-process the file - i.e. extracting the id's you want. Store the results into a new HBase table. Then perform the join directly in HBase via Phoenix not Spark .

The rationale for that approach is to move the computation to the data. The bulk of the data resides in HBase - so then move the small data (id's in the files) to there.

I am not familiar directly with Phoenix except that it provides a sql layer on top of hbase. Presumably then it would be capable of doing such a join and storing the result in a separate HBase table ..? That separate table could then be loaded into Spark to be used in your subsequent computations.