Hive join optimization

2019-02-16 02:53发布

问题:

I have two sets of data both stored in an S3 bucket which I need to process in Hive and store the output back to S3. Sample rows from each datasets are as follows:

DataSet 1: {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126"}

DataSet2: {"requestId":"TADS6152JHGJH5435","userAgent":"Mozilla"}

I need to join these two data sets based on the requestId and output a combined row as:

Output:  {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126","userAgent":"Mozilla"}

The requestIds in dataset 1 is a proper subset of of the requestids in dataset 2. I am using a LEFT OUTER JOIN to get my output. Here is a simplified version of my Hive script:

CREATE EXTERNAL TABLE dataset1 (
     requestId string,
     customerId string,
     sessionId string
 )
LOCATION 's3://path_to_dataset1/';

CREATE EXTERNAL TABLE dataset2 (
     requestId string,
     userAgent string
 )
LOCATION 's3://path_to_dataset2/';

CREATE EXTERNAL TABLE output (
     requestId string,
     customerId string,
     sessionId string,
     userAgent string
 )
LOCATION 's3://path_to_output/';

INSERT OVERWRITE TABLE output
  SELECT d1.requestId, d1.customerId, d1.sessionId, d2.userAgent
  FROM dataset1 d1 LEFT OUTER JOIN dataset2 d2
  ON (d1.requestId=d2.requestId);

My question is:

Are there opportunities to optimize this join? Can I use partitioning/bucketing of the tables to run the join faster? I have set hive.auto.convert.join to true in my script. What other hive properties should I set to gain better performance for the above queries?

回答1:

1. Optimize Joins

We can improve the performance of joins by enabling Auto Convert Map Joins and enabling optimization of skew joins.

Auto Map Joins

Auto Map-Join is a very useful feature when joining a big table with a small table. if we enable this feature, the small table will be saved in the local cache on each node, and then joined with the big table in the Map phase. Enabling Auto Map Join provides two advantages. First, loading a small table into cache will save read time on each data node. Second, it avoids skew joins in the Hive query, since the join operation has been already done in the Map phase for each block of data.

Skew Joins

We can enable optimization of skew joins, i.e. imbalanced joins by setting hive.optimize.skewjoin property to true either via SET command in hive shell or hive-site.xml file.

  <property>
    <name>hive.optimize.skewjoin</name>
    <value>true</value>
    <description>
      Whether to enable skew join optimization. 
      The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
      processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce
      job, process those skewed keys. The same key need not be skewed for all the tables, and so,
      the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
      map-join.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.key</name>
    <value>100000</value>
    <description>
      Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,
      we think the key as a skew join key. 
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.map.tasks</name>
    <value>10000</value>
    <description>
      Determine the number of map task used in the follow up map join job for a skew join.
      It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.min.split</name>
    <value>33554432</value>
    <description>
      Determine the number of map task at most used in the follow up map join job for a skew join by specifying 
      the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.
    </description>
  </property>

2. Enable Bucketed Map Joins

If tables are bucketed by a particular column and these tables are being used in joins then we can enable bucketed map join to improve the performance.

  <property>
    <name>hive.optimize.bucketmapjoin</name>
    <value>true</value>
    <description>Whether to try bucket mapjoin</description>
  </property>
  <property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>true</value>
    <description>Whether to try sorted bucket merge map join</description>
  </property>

.

3. Enable Tez Execution Engine

Instead of running Hive queries on venerable Map-reduce engine, we can improve the performance of hive queries at least by 100% to 300 % by running on Tez execution engine. We can enable the Tez engine with below property from hive shell.

hive> set hive.execution.engine=tez;

.

4. Enable Parallel Execution

Hive converts a query into one or more stages. Stages could be a MapReduce stage, sampling stage, a merge stage, a limit stage. By default, Hive executes these stages one at a time. A particular job may consist of some stages that are not dependent on each other and could be executed in

parallel, possibly allowing the overall job to complete more quickly. Parallel execution can be enabled by setting below properties.

  <property>
    <name>hive.exec.parallel</name>
    <value>true</value>
    <description>Whether to execute jobs in parallel</description>
  </property>
  <property>
    <name>hive.exec.parallel.thread.number</name>
    <value>8</value>
    <description>How many jobs at most can be executed in parallel</description>
  </property>

.

5. Enable Vectorization

Vectorization feature is introduced into hive for the first time in hive-0.13.1 release only. By vectorized query execution, we can improve performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.

We can enable vectorized query execution by setting below three properties in either hive shell or hive-site.xml file.

hive> set hive.vectorized.execution.enabled = true;
hive> set hive.vectorized.execution.reduce.enabled = true;
hive> set hive.vectorized.execution.reduce.groupby.enabled = true;

.

6. Enable Cost Based Optimization

Recent Hive releases provided the feature of cost based optimization, one can achieve further optimizations based on query cost, resulting in potentially different decisions: how to order joins, which type of join to perform, degree of parallelism and others.

cost based optimization can be enabled by setting below properties in hive-site.xml file.

  <property>
    <name>hive.cbo.enable</name>
    <value>true</value>
    <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>
  </property>
  <property>
    <name>hive.compute.query.using.stats</name>
    <value>true</value>
    <description>
      When set to true Hive will answer a few queries like count(1) purely using stats
      stored in metastore. For basic stats collection turn on the config hive.stats.autogather to true.
      For more advanced stats collection need to run analyze table queries.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.partition.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires partition level basic
      statistics like number of rows, data size and file size. Partition statistics are fetched from
      metastore. Fetching partition statistics for each needed partition can be expensive when the
      number of partitions is high. This flag can be used to disable fetching of partition statistics
      from metastore. When this flag is disabled, Hive will make calls to filesystem to get file sizes
      and will estimate the number of rows from row schema.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.column.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires column statistics.
      Column statistics are fetched from metastore. Fetching column statistics for each needed column
      can be expensive when the number of columns is high. This flag can be used to disable fetching
      of column statistics from metastore.
    </description>
  </property>
  <property>
    <name>hive.stats.autogather</name>
    <value>true</value>
    <description>A flag to gather statistics automatically during the INSERT OVERWRITE command.</description>
  </property>
  <property>
    <name>hive.stats.dbclass</name>
    <value>fs</value>
    <description>
      Expects one of the pattern in [jdbc(:.*), hbase, counter, custom, fs].
      The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), 
      each task writes statistics it has collected in a file on the filesystem, which will be aggregated 
      after the job has finished. Supported values are fs (filesystem), jdbc:database (where database 
      can be derby, mysql, etc.), hbase, counter, and custom as defined in StatsSetupConst.java.
    </description>
  </property>