Optimize the join performance with Hive partition

2019-07-14 07:13发布

问题:

I have a Hive orc test_dev_db.TransactionUpdateTable table with some sample data, which will be holding increment data which needs to be updated to main table (test_dev_db.TransactionMainHistoryTable) which is partitioned on columns Country,Tran_date.

Hive Incremental load table schema: It holds 19 rows which needs to be merge.

CREATE TABLE IF NOT EXISTS test_dev_db.TransactionUpdateTable 
(
Transaction_date timestamp,
Product       string,
Price         int,
Payment_Type  string,
Name          string, 
City          string,
State         string,
Country       string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS orc
;

Hive main table schema: Total row counts 77.

CREATE TABLE IF NOT EXISTS test_dev_db.TransactionMainHistoryTable
(
Transaction_date timestamp,
Product       string,
Price         int,
Payment_Type  string,
Name          string,
City          string,
State         string
)
PARTITIONED BY (Country string,Tran_date string) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS orc
;

I am running below query to merge the incremental data with main table.

SELECT
  case when i.transaction_date is not null then cast(substring(current_timestamp(),0,19) as timestamp)  
  else t.transaction_date   end as transaction_date,
  t.product,
  case when i.price is not null then i.price else t.price end as price,
  t.payment_type,
  t.name,
  t.city,
  t.state,
  t.country,
  case when i.transaction_date is not null then substring(current_timestamp(),0,10) 
  else t.tran_date end as tran_date
  from
test_dev_db.TransactionMainHistoryTable t
full join test_dev_db.TransactionUpdateTable i on (t.Name=i.Name)
;
/hdfs/path/database/test_dev_db.db/transactionmainhistorytable/country=Australia/tran_date=2009-03-01
/hdfs/path/database/test_dev_db.db/transactionmainhistorytable/country=Australia/tran_date=2009-05-01

and running below query to filter out the specific partitions which needs to be merged, just to eliminate the rewriting the no updated partitions.

SELECT
  case when i.transaction_date is not null then cast(substring(current_timestamp(),0,19) as timestamp)  
  else t.transaction_date   end as transaction_date,
  t.product,
  case when i.price is not null then i.price else t.price end as price,
  t.payment_type,
  t.name,
  t.city,
  t.state,
  t.country,
  case when i.transaction_date is not null then substring(current_timestamp(),0,10) else t.tran_date end as tran_date
  from
(SELECT 
  *
  FROM 
test_dev_db.TransactionMainHistoryTable
where Tran_date in
(select distinct  from_unixtime(to_unix_timestamp (Transaction_date,'yyyy-MM-dd HH:mm'),'yyyy-MM-dd') from test_dev_db.TransactionUpdateTable
))t
full join test_dev_db.TransactionUpdateTable i on (t.Name=i.Name)
;

only Transaction_date,Price and partition column tran_date needs to be updated in both the cases. Both queries running fine though the lateral taking longer time to execute.

Execution plan for partitioned table as:

 Stage: Stage-5
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: transactionmainhistorytable
            filterExpr: tran_date is not null (type: boolean)
            Statistics: Num rows: 77 Data size: 39151 Basic stats: COMPLETE Column stats: COMPLETE
            Map Join Operator
              condition map:
                   Left Semi Join 0 to 1
              keys:
                0 tran_date (type: string)
                1 _col0 (type: string)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8

Am I doing something wrong with second query? Do I need to use both the partition column for better pruning. Any help or advice is greatly appreciated.

回答1:

Maybe this is not a complete answer but I hope these thoughts will be useful.

where tran_date IN (select ... )

is actually the same as

LEFT SEMI JOIN (SELECT ...)

And this is reflected in the plan:

Map Join Operator
              condition map:
                   Left Semi Join 0 to 1
              keys:
                0 tran_date (type: string)
                1 _col0 (type: string) 

And it is executed as map-join. First the subquery dataset is being selected, second it is placed in the distributed cache, loaded in memory to be used in the map-join. All these steps: select, load into memory, map-join are slower than read and overwrite all the table because it is so small and over-partitioned: statistics says Num rows: 77 Data size: 39151 - too small to be partitioned by two columns and even too small to be partitioned at all. Try bigger table and use EXPLAIN EXTENDED to check what is really being scanned.

Also, replace this:

from_unixtime(to_unix_timestamp (Transaction_date,'yyyy-MM-dd HH:mm'),'yyyy-MM-dd')

with substr(Transaction_date,0,10) or date(Transaction_date)

And substring(current_timestamp,0,10) with current_date just to simplify the code a bit.

If you want partition filter displayed in the plan, try to substitute partition filter passed as a list of partitions which you can select in a separate session and use shell to pass the list of partitions into the where clause, see this answer: https://stackoverflow.com/a/56963448/2700344