如何在AWS Athena中自动执行MSCK REPAIR TABLE

46

我有一个Spark批处理作业,每小时执行一次。每次运行会生成并存储新数据到S3,目录命名模式为DATA/YEAR=?/MONTH=?/DATE=?/datafile

上传数据到S3后,我想使用Athena进行调查研究。此外,我还希望通过将Athena连接到QuickSight作为数据源来对其进行可视化。

问题是,每次运行我的Spark批处理后,存储在S3中的新生成数据,除非我手动运行查询MSCK REPAIR TABLE,否则Athena不会发现它们。

是否有办法使Athena自动更新数据,以便我可以创建完全自动化的数据可视化流程?


8
@samuel_liew这个问题并不宽泛,只是提供了一些额外信息来解释问题的背景。虽然从技术上讲,OP想要一个完全自动化的数据可视化管道可能并不重要,但是上下文对于让人们提供解决基本挑战的指导很重要。具体的挑战是在Athena中管理分区,因为它们是需要创建的独立元数据对象。它们没有被自动创建或发现,这是非常出乎意料的,正如这个问题上的赞数所显示的那样。 - Davos
3个回答

30

有多种方法可以安排此任务。您如何安排工作流程?您是否使用类似于AirflowLuigiAzkaban、cron或使用AWS数据管道的系统?

从这些选项中,您应该能够触发以下CLI命令。

$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"

另一个选项是AWS Lambda。您可以编写一个函数,在S3上上传新文件时调用MSCK REPAIR TABLE some_database.some_table

例如,可以编写如下的Lambda函数:

import boto3

def lambda_handler(event, context):
    bucket_name = 'some_bucket'

    client = boto3.client('athena')

    config = {
        'OutputLocation': 's3://' + bucket_name + '/',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}

    }

    # Query Execution Parameters
    sql = 'MSCK REPAIR TABLE some_database.some_table'
    context = {'Database': 'some_database'}

    client.start_query_execution(QueryString = sql, 
                                 QueryExecutionContext = context,
                                 ResultConfiguration = config)

您需要配置触发器,在您的存储桶下 DATA/ 前缀添加新数据时执行 Lambda 函数。

最终,在使用作业调度程序运行 Spark 作业后显式重建分区具有自说明性的优势。另一方面,AWS Lambda 对于像这样的作业非常方便。


2
我认为另一个可能性是在您的Glue脚本中使用boto。您应该能够使用此链接中的内容来执行相关的MSCK REPAIR TABLE命令:http://boto3.readthedocs.io/en/latest/reference/services/athena.html#Athena.Client.start_query_execution - RobinL
2
我使用了一个定时的AWS爬虫来爬取数据库以更新表格。你对这个解决方案有什么看法? - YangZhao
可以这样做。使用Lambda函数的好处是Lambda可以动态响应事件,例如在Athena中向S3添加文件。Lambda函数的缺点是从持续集成和版本控制的角度来看,它们可能有些令人困惑。 - Zerodf
2
在这种情况下,Lambda 的另一个不好的地方是其执行必须在5分钟内完成,这对于REPAIR TABLE来说可能太短了(但对于ADD PARTITION来说足够了)。 - jmng
有关这个问题 https://stackoverflow.com/questions/63149782/assistance-in-refreshing-data-in-athena-table,您有什么想法吗? - akash sharma

7

你应该使用 ADD PARTITION 命令来运行:

aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."

这将从您的S3位置添加新创建的分区。Athena利用Hive对数据进行分区。要创建带有分区的表,必须在CREATE TABLE语句中定义它。使用PARTITIONED BY来定义按哪些键分区数据。


如果您正在使用Firehose将数据放入Athena存储桶中,则不确定是否可能。即使使用“动态”分区,仍然需要指定分区,但是这可能行不通 :-( - Ralph Bolton
@RalphBolton 当使用Firehose进行日志记录时,您还可以使用分区投影。请参阅我的回答。 - supernova

7

解决问题并更新表格的方法有多种:

  1. 调用MSCK REPAIR TABLE。这将扫描所有数据。由于每个文件都要完全读取(至少由AWS完全收费),因此代价很高,速度也非常慢。简而言之:不要这样做!

  2. 通过调用ALTER TABLE ADD PARTITION abc ...自行创建分区。这在某种程度上很好,因为没有扫描任何数据,成本也很低。查询也很快,所以这里没有问题。如果您的文件结构非常混乱而没有任何共同的模式(这似乎不是您的情况,因为它是一个精心组织的S3键模式)。但该方法也有缺点:A)很难维护 B)所有分区都必须存储在GLUE目录中。当您有大量分区时,这可能会成为问题,因为它们需要被读取并传递给Athena和EMRs Hadoop基础设施。

  3. 使用分区投影。有两种不同的风格可供评估。以下是创建Hadoop分区的变体,这意味着没有GLUE目录条目通过网络发送,因此可以更快地处理大量分区。缺点是可能会“命中”一些不存在的分区。当然,这些将被忽略,但内部将生成所有可能与您的查询匹配的分区-无论它们是否在S3上(因此始终向查询添加分区过滤器!)。如果正确执行,则此选项是一种"fire and forget"方法,因为不需要更新。

CREATE EXTERNAL TABLE `mydb`.`mytable`
(
   ...
)
  PARTITIONED BY (
    `YEAR` int,
    `MONTH` int,
    `DATE` int)
  ...
  LOCATION
    's3://DATA/'
  TBLPROPERTIES(
      "projection.enabled" = "true",
      "projection.account.type" = "integer",
      "projection.account.range" = "1,50",
      "projection.YEAR.type" = "integer",
      "projection.YEAR.range" = "2020,2025",
      "projection.MONTH.type" = "integer",
      "projection.MONTH.range" = "1,12",
      "projection.DATE.type" = "integer",
      "projection.DATE.range" = "1,31",
      "storage.location.template" = "s3://DATA/YEAR=${YEAR}/MONTH=${MONTH}/DATE=${DATE}/"
  );

https://docs.aws.amazon.com/zh_cn/athena/latest/ug/partition-projection.html

  1. 列出所有的选项:您也可以使用GLUE爬虫。但是它不像广告中宣传的那样灵活,似乎不是一个优选方案。

  2. 如果您有许多自动化脚本来准备表格,则使用Glue Data Catalog API 直接更加灵活。这可能是方法#2的替代方法。

简而言之:

  • 如果您的应用程序是SQL的并且希望使用最简洁的方法而无需编写脚本,则使用分区投影。
  • 如果有许多分区,则使用分区投影。
  • 如果您只有少量分区或分区没有通用模式,则使用方法#2。
  • 如果您的脚本较多且脚本已经完成了大部分工作,并且易于处理,则考虑采用方法#5。
  • 如果您感到困惑并且毫无头绪,请先尝试使用分区投影!它应该适用于95%的使用情况。

1
关于投影的注意事项:请阅读 AWS 文档中的“考虑因素和限制”部分。 - Kaymaz
RTFM总是一个好主意。你能详细说明一下你特别关心什么吗?(因为我自己也在使用这种方法,所以我对任何潜在问题都非常感兴趣)。顺便说一下,我的答案中已经链接了官方的AWS投影文档。谢谢! - supernova
当然!顺便说一句,非常好的答案。关于“如果有太多分区为空”,我刚刚重新阅读了一遍,可能理解有误...它的意思是如果分区内没有任何内容,而不是分区不存在。我正在使用它与Firehose到S3。 - Kaymaz
1
这两种投影机制在这里有所不同。例如,第一种创建所有可能的分区。例如,如果您仅定义“projection.YEAR.range”=“2000,3000”,并且未对查询应用过滤器,则该机制将创建1000个分区(如果有多个分区键,则会创建笛卡尔积,如果未经过滤)。分区将传递给执行(到集群)。Presto跳过空分区,但您会遇到与GLUE相同的问题:数据传输会使您的速度变慢。我通过意外的经验发现,创建成千上万个分区会非常缓慢。 - supernova
1
@Tanmay 我一开始也是这么想的。正确的做法只是创建新的分区。但它确实会读取数据并收费(相信我 - 我非常确定,因为它让我们意外地受到了影响)。此外,运行时间也会增加。你没有想过为什么2021年的文件列表需要近9秒钟才能读取吗?Presto上有关于文件需要被打开的信息。对于Presto,有一种特定的模式/驱动程序/fs层补丁/等等可以解决这个问题,但在Athena 1和2中没有。使用投影,你将永远不会回头。 - supernova
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接