Hive连接优化

4

我有两个数据集都存储在S3存储桶中,需要在Hive中处理并将输出存储回S3。 每个数据集的示例行如下:

DataSet 1: {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126"}

DataSet2: {"requestId":"TADS6152JHGJH5435","userAgent":"Mozilla"}

我需要根据requestId将这两个数据集合并,输出一个组合行,如下:

Output:  {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126","userAgent":"Mozilla"}

数据集1中的requestIds是数据集2中requestids的一个子集。我使用了LEFT OUTER JOIN来获取我的输出结果。以下是我的Hive脚本的简化版本:

CREATE EXTERNAL TABLE dataset1 (
     requestId string,
     customerId string,
     sessionId string
 )
LOCATION 's3://path_to_dataset1/';

CREATE EXTERNAL TABLE dataset2 (
     requestId string,
     userAgent string
 )
LOCATION 's3://path_to_dataset2/';

CREATE EXTERNAL TABLE output (
     requestId string,
     customerId string,
     sessionId string,
     userAgent string
 )
LOCATION 's3://path_to_output/';

INSERT OVERWRITE TABLE output
  SELECT d1.requestId, d1.customerId, d1.sessionId, d2.userAgent
  FROM dataset1 d1 LEFT OUTER JOIN dataset2 d2
  ON (d1.requestId=d2.requestId);

我的问题是:

是否有优化此连接的机会?我可以使用表的分区/桶来加速连接吗?我已经在脚本中设置了hive.auto.convert.jointrue。还应该设置哪些其他Hive属性以获得更好的性能?

1个回答

19
1. Optimize Joins
我们可以通过启用Auto Convert Map Joins和启用优化倾斜连接来提高连接的性能。
Auto Map Joins

Auto Map-Join是一项非常有用的功能,在将大表与小表连接时使用。如果启用此功能,则每个节点上的本地缓存将保存小表,然后在Map阶段与大表连接。启用自动映射连接具有两个优点。首先,将小表加载到缓存中将节省每个数据节点的读取时间。其次,它避免了Hive查询中的倾斜连接,因为每个数据块的连接操作已经在Map阶段完成。

Skew Joins

我们可以通过在hive shell中使用SET命令或者在hive-site.xml文件中将hive.optimize.skewjoin属性设置为true,从而启用倾斜连接的优化,即不平衡连接。

  <property>
    <name>hive.optimize.skewjoin</name>
    <value>true</value>
    <description>
      Whether to enable skew join optimization. 
      The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
      processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce
      job, process those skewed keys. The same key need not be skewed for all the tables, and so,
      the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
      map-join.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.key</name>
    <value>100000</value>
    <description>
      Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,
      we think the key as a skew join key. 
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.map.tasks</name>
    <value>10000</value>
    <description>
      Determine the number of map task used in the follow up map join job for a skew join.
      It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.min.split</name>
    <value>33554432</value>
    <description>
      Determine the number of map task at most used in the follow up map join job for a skew join by specifying 
      the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.
    </description>
  </property>

2. Enable Bucketed Map Joins

如果按照特定列对表进行分桶,并且这些表在联接中被使用,则可以启用桶映射联接以提高性能。

  <property>
    <name>hive.optimize.bucketmapjoin</name>
    <value>true</value>
    <description>Whether to try bucket mapjoin</description>
  </property>
  <property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>true</value>
    <description>Whether to try sorted bucket merge map join</description>
  </property>

.

3. Enable Tez Execution Engine

通过在Tez执行引擎上运行Hive查询,我们可以将性能提高至少100%到300%,而不是在老旧的MapReduce引擎上运行Hive查询。我们可以通过以下属性在Hive shell中启用Tez引擎。

hive> set hive.execution.engine=tez;

.

4. Enable Parallel Execution

Hive将查询转换为一个或多个阶段。阶段可以是MapReduce阶段、采样阶段、合并阶段或限制阶段。默认情况下,Hive按顺序执行这些阶段。一个特定的作业可能包含一些彼此不依赖的阶段,并且可以并行执行,从而可能允许整个作业更快地完成。可以通过设置以下属性来启用并行执行。

  <property>
    <name>hive.exec.parallel</name>
    <value>true</value>
    <description>Whether to execute jobs in parallel</description>
  </property>
  <property>
    <name>hive.exec.parallel.thread.number</name>
    <value>8</value>
    <description>How many jobs at most can be executed in parallel</description>
  </property>

.

5. Enable Vectorization

在hive-0.13.1版本中,Vectorization特性首次引入Hive。通过向量化查询执行,我们可以批量处理1024行数据而不是每次仅处理单行数据,从而提高扫描、聚合、过滤和连接等操作的性能。

我们可以通过以下三个属性在Hive shell或hive-site.xml文件中进行设置,以启用向量化查询执行:


hive> set hive.vectorized.execution.enabled = true;
hive> set hive.vectorized.execution.reduce.enabled = true;
hive> set hive.vectorized.execution.reduce.groupby.enabled = true;

.

6. Enable Cost Based Optimization

最近的Hive发行版提供了基于成本的优化功能,可以根据查询成本实现进一步优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等。

可以通过在hive-site.xml文件中设置以下属性来启用基于成本的优化。

  <property>
    <name>hive.cbo.enable</name>
    <value>true</value>
    <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>
  </property>
  <property>
    <name>hive.compute.query.using.stats</name>
    <value>true</value>
    <description>
      When set to true Hive will answer a few queries like count(1) purely using stats
      stored in metastore. For basic stats collection turn on the config hive.stats.autogather to true.
      For more advanced stats collection need to run analyze table queries.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.partition.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires partition level basic
      statistics like number of rows, data size and file size. Partition statistics are fetched from
      metastore. Fetching partition statistics for each needed partition can be expensive when the
      number of partitions is high. This flag can be used to disable fetching of partition statistics
      from metastore. When this flag is disabled, Hive will make calls to filesystem to get file sizes
      and will estimate the number of rows from row schema.
    </description>
  </property>
  <property>
    <name>hive.stats.fetch.column.stats</name>
    <value>true</value>
    <description>
      Annotation of operator tree with statistics information requires column statistics.
      Column statistics are fetched from metastore. Fetching column statistics for each needed column
      can be expensive when the number of columns is high. This flag can be used to disable fetching
      of column statistics from metastore.
    </description>
  </property>
  <property>
    <name>hive.stats.autogather</name>
    <value>true</value>
    <description>A flag to gather statistics automatically during the INSERT OVERWRITE command.</description>
  </property>
  <property>
    <name>hive.stats.dbclass</name>
    <value>fs</value>
    <description>
      Expects one of the pattern in [jdbc(:.*), hbase, counter, custom, fs].
      The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), 
      each task writes statistics it has collected in a file on the filesystem, which will be aggregated 
      after the job has finished. Supported values are fs (filesystem), jdbc:database (where database 
      can be derby, mysql, etc.), hbase, counter, and custom as defined in StatsSetupConst.java.
    </description>
  </property>

优秀的解释 - Gaurav Jain

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接