How avoid cross join in hive?

2020-07-20 03:21发布

问题:

I have two tables. one includes 1 million records, the other includes 20 million records.


    table 1
    value
    (1, 1)
    (2, 2)
    (3, 3)
    (4, 4)
    (5, 4)
    ....

    table 2
    value
    (55, 11)
    (33, 22)
    (44, 66)
    (22, 11)
    (11, 33)
    ....

I need to use the value in tables 1 to multiply by the value in table 2, get the rank of the result, and get top 5 in the rank. their result would be like:


    value from table 1, top 5 for each value in table 1
    (1, 1), 1*44 + 1*66 = 110
    (1, 1), 1*55 + 1*11 = 66
    (1, 1), 1*33 + 1*22 = 55
    (1, 1), 1*11 + 1*33 = 44
    (1, 1), 1*22 + 1* 11 = 33
    .....

I tried to use cross join in hive. but I always get a failure due to the table is too large.

回答1:

select top 5 from table 2 first, then do a cross join with first table. This will be the same as cross join two tables and taking top5 after cross join, but the number of rows joined in the first case will be much less. Cross join with small 5 rows dataset will be transformed to map-join and executed as fast as table1 full scan.

Look at the below demo. Cross join was transformed to map join. Note "Map Join Operator" in the plan and this warning: "Warning: Map Join MAPJOIN[19][bigTable=?] in task 'Map 1' is a cross product":

hive> set hive.cbo.enable=true;
hive> set hive.compute.query.using.stats=true;
hive> set hive.execution.engine=tez;
hive> set hive.auto.convert.join.noconditionaltask=false;
hive> set hive.auto.convert.join=true;
hive> set hive.vectorized.execution.enabled=true;
hive> set hive.vectorized.execution.reduce.enabled=true;
hive> set hive.vectorized.execution.mapjoin.native.enabled=true;
hive> set hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled=true;
hive>
    > explain
    > with table1 as (
    > select stack(5,1,2,3,4,5) as id
    > ),
    > table2 as
    > (select t2.id
    >    from (select t2.id, dense_rank() over(order by id desc) rnk
    >            from (select stack(11,55,33,44,22,11,1,2,3,4,5,6) as id) t2
    >         )t2
    >   where t2.rnk<6
    > )
    > select t1.id, t1.id*t2.id
    >   from table1 t1
    >        cross join table2 t2;
Warning: Map Join MAPJOIN[19][bigTable=?] in task 'Map 1' is a cross product
OK
Plan not optimized by CBO.

Vertex dependency in root stage
Map 1 <- Reducer 3 (BROADCAST_EDGE)
Reducer 3 <- Map 2 (SIMPLE_EDGE)

Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Map 1
         File Output Operator [FS_17]
            compressed:false
            Statistics:Num rows: 1 Data size: 26 Basic stats: COMPLETE Column stats: NONE
            table:{"serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"}
            Select Operator [SEL_16]
               outputColumnNames:["_col0","_col1"]
               Statistics:Num rows: 1 Data size: 26 Basic stats: COMPLETE Column stats: NONE
               Map Join Operator [MAPJOIN_19]
               |  condition map:[{"":"Inner Join 0 to 1"}]
               |  HybridGraceHashJoin:true
               |  keys:{}
               |  outputColumnNames:["_col0","_col1"]
               |  Statistics:Num rows: 1 Data size: 26 Basic stats: COMPLETE Column stats: NONE
               |<-Reducer 3 [BROADCAST_EDGE]
               |  Reduce Output Operator [RS_14]
               |     sort order:
               |     Statistics:Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
               |     value expressions:_col0 (type: int)
               |     Select Operator [SEL_9]
               |        outputColumnNames:["_col0"]
               |        Statistics:Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
               |        Filter Operator [FIL_18]
               |           predicate:(dense_rank_window_0 < 6) (type: boolean)
               |           Statistics:Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
               |           PTF Operator [PTF_8]
               |              Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"partition by:":"0","name:":"windowingtablefunction","order by:":"_col0(DESC)"}]
               |              Statistics:Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
               |              Select Operator [SEL_7]
               |              |  outputColumnNames:["_col0"]
               |              |  Statistics:Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
               |              |<-Map 2 [SIMPLE_EDGE]
               |                 Reduce Output Operator [RS_6]
               |                    key expressions:0 (type: int), col0 (type: int)
               |                    Map-reduce partition columns:0 (type: int)
               |                    sort order:+-
               |                    Statistics:Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
               |                    UDTF Operator [UDTF_5]
               |                       function name:stack
               |                       Statistics:Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
               |                       Select Operator [SEL_4]
               |                          outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11"]
               |                          Statistics:Num rows: 1 Data size: 48 Basic stats: COMPLETE Column stats: COMPLETE
               |                          TableScan [TS_3]
               |                             alias:_dummy_table
               |                             Statistics:Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: COMPLETE
               |<-UDTF Operator [UDTF_2]
                     function name:stack
                     Statistics:Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: COMPLETE
                     Select Operator [SEL_1]
                        outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5"]
                        Statistics:Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: COMPLETE
                        TableScan [TS_0]
                           alias:_dummy_table
                           Statistics:Num rows: 1 Data size: 1 Basic stats: COMPLETE Column stats: COMPLETE

Time taken: 0.199 seconds, Fetched: 66 row(s)

Just replace stacks in my demo with your tables.