如何高效地更新Impala表,当表的文件非常频繁地被修改时。

12
我们有一个基于Hadoop的解决方案(CDH 5.15),在某些目录下我们会得到新的HDFS文件。在这些目录的顶部,我们有4-5个Impala表(2.1)。将文件写入HDFS的过程是Spark Structured Streaming (2.3.1)。
现在,我们在文件写入HDFS时运行一些DDL查询:
- `ALTER TABLE table1 RECOVER PARTITONS` 以检测添加到表中的新分区(及其HDFS目录和文件)。 - `REFRESH table1 PARTITIONS (partition1=X, partition2=Y)`,使用每个分区的所有键。
目前,这些DDL查询花费的时间有点长,它们在系统中排队等候,影响了系统数据的可用性。
所以,我的问题是:有没有一种更有效地将数据纳入到系统中的方法? 我们已考虑过以下方法:
- 使用`ALTER TABLE..RECOVER PARTITONS`,但是根据文档,它只刷新新分区。 - 尝试使用`REFRESH ..PARTITION...`一次性刷新多个分区,但语法不支持这样做。 - 尝试分批处理查询,但Hive JDBC驱动程序不支持批量查询。
  • 考虑到系统已经很忙,我们可以尝试并行进行这些更新吗?

  • 您知道其他的方法吗?
  • 谢谢!

    Victor

    注:我们知道哪些分区需要刷新的方式是通过使用HDFS事件,因为在Spark Structured Streaming中,我们不知道文件何时被写入。

    注2:同时,HDFS中写入的文件有时很小,如果能够合并这些文件,那就太好了。


    2
    很抱歉,我无法回答您的问题,但我想提一下新版Impala增加了“无需干预”的元数据管理功能。请查看https://impala.apache.org/docs/build/html/topics/impala_metadata.html。 - mazaneicha
    谢谢 @mazaneicha!这看起来非常有前途!我们计划升级我们的技术栈,所以也许这是另一个理由去做它。 - Victor
    请检查Hive中的msck repair命令是否有用。 - yammanuruarun
    1个回答

    5

    由于没有人能够解决我的问题,我想分享我们采取的方法以使该处理更加高效,欢迎提出评论。

    我们发现(文档对此不是很清楚),存储在HDFS中Spark“检查点”中的一些信息是一些元数据文件的数量,这些文件描述了每个Parquet文件的编写时间和大小:

    $hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata
    
    w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
    rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
    w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
    ...
    
    $hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
    v1
    {"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
    {"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
    ...
    

    所以,我们所做的是:
    • 构建一个Spark Streaming Job轮询`_spark_metadata`文件夹。
      • 我们使用了`fileStream`,因为它允许我们定义要使用的文件过滤器。
      • 该流中的每个条目都是这些JSON行之一,其中解析出文件路径和大小。
    • 通过父文件夹(映射到每个Impala分区)将文件分组。
    • 对于每个文件夹:
      • 读取一个数据帧,仅加载目标Parquet文件(以避免与写文件的其他作业发生竞争条件)
      • 计算要写入的块数(使用JSON中的大小字段和目标块大小)
      • 将数据框缩合成所需数量的分区并将其写回HDFS
      • 执行DDL`REFRESH TABLE myTable PARTITION([从新文件夹派生的分区键]`
    • 最后,删除源文件
    我们实现了以下目标:
    • 通过在每个分区和批次执行一个刷新来限制DDL。

    • 通过配置批处理时间和块大小,我们能够使我们的产品适应不同的部署场景,具有更大或更小的数据集。

    • 该解决方案非常灵活,因为我们可以为Spark Streaming作业分配更多或更少的资源(执行器、核心、内存等),并且还可以启动/停止它(使用其自己的检查点系统)。

    • 我们还在研究在此过程中应用一些数据重分区的可能性,以使分区尽可能接近最优大小。


    1
    你认为使用KafkaConnect而不是Spark可以实现相同的效果吗?在最后一步“最后,删除源文件”中,如何避免删除新进来的文件? - Mario Stefanutti
    元数据文件流在每个批次中都包含要处理的确切文件,因此您不必担心在此期间可能已写入的新文件。 - Victor
    根据KafkaConnect,我从未使用过,我会去了解一下。谢谢! - Victor

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接