替代Athena查询S3中数据的方案

3
我有约300 GB的S3数据。假设这些数据如下所示:
## S3://Bucket/Country/Month/Day/1.csv 

S3://Countries/Germany/06/01/1.csv 
S3://Countries/Germany/06/01/2.csv 
S3://Countries/Germany/06/01/3.csv 

S3://Countries/Germany/06/02/1.csv 
S3://Countries/Germany/06/02/2.csv 

我们正在对数据进行一些复杂的聚合,由于一些国家的数据很大而一些国家的数据很小,因此使用 AWS EMR 没有意义,因为一旦小国家的数据处理完毕,资源就会被浪费,而大国家则需要长时间运行。因此,我们决定使用带 Docker 容器的 AWS Batch 和 Athena。每个作业每天只处理一个国家的数据。
现在大约有 1000 个作业同时启动时,当它们查询 Athena 读取数据时,容器会因达到 Athena 查询限制而失败。
因此,我想知道解决这个问题的其他可能方法是什么?我应该使用 Redshift 集群,将所有数据加载到那里,然后所有容器都向 Redshift 集群查询,因为它们没有查询限制。但这很昂贵,而且需要很长时间才能启动。
另一个选择是在 EMR 上读取数据,并在其上使用 Hive 或 Presto 查询数据,但再次会达到查询限制。
如果有人能提供更好的解决方法,那就太好了。

1
我相信你已经想到了这一点,但如果出于某种原因你还没有想到,请确保根据国家和日期对表进行分区。这将加速查询并在此用例中大幅降低成本。 - Theo
@Theo,是的,我已经提出了这个问题,并询问了有关此事的问题。这里是链接:https://stackoverflow.com/questions/57287621/convert-folders-structure-to-partitions-on-s3-using-spark 但没有得到任何回应。问题是我们从其他来源获取此数据,而我们无法控制他们如何编写数据。因此,我需要找到另一种方法来读取此数据并将其作为分区表。 - Waqar Ahmed
@Theo 如果您也能提供一些见解,那将非常棒。 - Waqar Ahmed
我会尝试回答那个问题,感谢提供链接。我之前没看到它是因为没有标记为 amazon-athena,我也会添加这个标记。 - Theo
@谢谢。非常感激! - Waqar Ahmed
当你说到大型国家时,需要花费多长时间是什么?这是要扫描的文件数量吗?文件是如何被转储到S3中的?您是否可以以ORC格式转储它们?因为1GB的CSV文件在转换为ORC时会变成100MB。结合Athena或EMR使用:您可以利用投影推导来进一步加快查询速度。 - chendu
4个回答

3
据我理解,您只需向AWS Athena服务发送查询,等待所有聚合步骤完成后,从Athena保存结果的S3存储桶中检索结果csv文件,因此您最终会得到1000个文件(每个作业一个)。但问题在于Athena并发查询的数量,而不是总执行时间。
您考虑过使用Apache Airflow来编排和调度查询吗?我认为Airflow是Lambda和Step Functions组合的替代方案,但它完全免费。它易于在本地和远程机器上设置,具有丰富的任务监控CLI和GUI,抽象化了所有调度和重试逻辑。 Airflow甚至有hooks与AWS服务进行交互。该工具甚至还有一个专用运算符用于将查询发送到Athena,因此发送查询就像这样简单:
from airflow.models import DAG
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from datetime import datetime

with DAG(dag_id='simple_athena_query',
         schedule_interval=None,
         start_date=datetime(2019, 5, 21)) as dag:

    run_query = AWSAthenaOperator(
        task_id='run_query',
        query='SELECT * FROM  UNNEST(SEQUENCE(0, 100))',
        output_location='s3://my-bucket/my-path/',
        database='my_database'
    )

我将其用于类似的日常/周常任务(使用CTAS语句处理数据),这些任务超出了并发查询数量的限制。
有很多博客文章和文档可以帮助您入门,例如:
- Medium帖子:使用Airflow自动执行AWS Athena查询并在S3中移动结果。 - 安装Airflow的完整指南,链接1链接2 - 您甚至可以设置与Slack的集成,以便在查询成功或失败时发送通知。

然而,我面临的主要问题是,只有4-5个查询实际上同时执行,而其他所有查询都处于空闲状态。


2
一种解决方案是不要同时启动所有作业,而是将它们分阶段进行以保持并发限制。我不知道你使用的工具是否容易实现这一点,但如果你把所有查询都同时发送给 Athena ,那么结果肯定不会很好。编辑:看起来你应该可以在 Batch 中限制作业的数量,请参见AWS batch - how to limit number of concurrent jobs(默认情况下,Athena 允许 25 个并发查询,因此尝试使用 20 个并发作业来确保安全系数 - 但还要在启动作业的代码中添加重试逻辑)。
另一个选择是将其不作为单独的查询,而是尝试将所有内容合并成较少的查询或单个查询 - 通过按国家和日期进行分组或通过生成所有查询并用 UNION ALL 将它们粘合在一起。不过,如果不了解数据和查询的更多信息,就很难说这是否可行。无论如何,您都可能需要对结果进行后处理,如果只是按某些有意义的东西排序,那么在查询运行后将结果拆分成必要的部分不会很难。
使用 Redshift 可能不是解决方案,因为听起来你每天只做一次,而且不会经常使用集群。 Athena 是更好的选择,你只需要更好地处理限制。
在有限的了解下,我认为使用 Lambda 和 Step Functions 比 Batch 更好。使用 Step Functions,你将拥有一个函数来启动 N 个查询(其中 N 等于并发限制,如果你还没有要求提高,则为 25),然后是一个轮询循环(参见示例如何实现),它检查已完成的查询,并启动新的查询以保持运行中的查询数量最大化。当所有查询都运行完毕后,最后一个函数可以触发任何你需要运行的工作流程(或者你可以在每个查询之后运行该工作流程)。
Lambda 和 Step Functions 的好处在于你不必支付空闲资源的费用。对于 Batch,你将支付等待 Athena 完成的资源的费用。由于 Athena 与 Redshift 等不同,具有异步 API,因此你可以运行一个 Lambda 函数 100ms 来启动查询,然后每隔几秒钟(或几分钟)运行 100ms 来检查是否有任何查询已完成,然后再运行约 100ms 完成。这几乎肯定少于 Lambda 免费套餐。

谢谢您的回答。在批处理中限制并发作业是有意义的。我们不是每天都进行此处理,而是每周一次,为期7天。关于按国家分组,由于每个国家的数据大小每周都可能会改变,因此这种方法也不太可行,需要手动查找小国家并找到合并这些国家的策略。 - Waqar Ahmed
关于Lambda和Step Functions,我还没有研究过,但我也会尝试它们。谢谢你指出这个选项。 - Waqar Ahmed
使用 Step Functions 比 Batch 更费力一些,因为您必须将过程分解成多个部分,并描述工作流程等等。但是,如果您使其正常运行,您将拥有比 Batch 更强大的解决方案,并且可以免费运行(除了 Athena 查询外,但在任何解决方案中都是相同的)。 - Theo
是的,那个解决方案需要时间来构建。如果我使用Redshift集群或带有Hive的EMR集群,是否能更好地克服Athena的限制?因为限制并发作业会大大减慢进程速度。 - Waqar Ahmed
Athena的并发查询限制为20个。详见https://docs.aws.amazon.com/athena/latest/ug/service-limits.html。 - rey don sancho

2
据我所知,Redshift SpectrumAthena的费用相同。你不应该将Redshift和Athena进行比较,它们有不同的目的。但首先,我认为你需要考虑解决数据倾斜问题。由于你提到了AWS EMR,我假设你使用Spark。为了处理大型和小型分区,你需要按月份或其他均匀分布的值重新分区数据集。或者你可以使用月份和国家进行分组。你明白了。"最初的回答"

在比较Redshift Spectrum和Athena的成本时,还要考虑运行Redshift集群的成本。它们每查询字节的成本可能相同,但是Redshift需要一个集群,并且Redshift Spectrum查询获得的计算资源量与集群的大小(和成本)成比例。 - Theo
是的,但通常我会保持较小的集群,只存储重复查询所需的数据。但无论如何,主要问题是不平等的分区。 - gorros
我同意可能有更有效的方法来解决OP的问题,运行许多小查询很少是高效的。然而,鉴于手头的信息,很难提出好的建议。看起来他们在Athena查询之后进行了大部分处理,这意味着仅仅重新分区以删除分区中的国家可能意味着必须重写其他所有内容(但这可能是为了获得更好的性能所需的)。 - Theo
我需要针对每个国家进行处理,因此无法将多个国家的数据合并在一起。 - Waqar Ahmed
然后,对于小数据框,您可以使用“broadcast”,因此您无需按国家洗牌数据。 - gorros
显示剩余2条评论

0

您可以使用Redshift Spectrum来实现此目的。是的,它有点昂贵,但它是可扩展的,并且非常适合执行复杂的聚合操作。


复杂的聚合操作与查询无关。我们正在通过编程实现这一点。我们仅使用Athena读取数据。由于Athena查询的限制,我们正在寻找其他从S3读取数据的方法。 - Waqar Ahmed
如果大部分的复杂性不在 Athena 查询中,那么我在答案中提出的一些建议可能是错误的。我假设查询很复杂,并且在 Batch 中运行的作业非常基本。如果您主要使用 Athena 读取数据,那么完全不使用 Athena,而是将 CSV 下载到批处理中运行的代码中可能更有意义?此外,还可以查看 S3 Select 是否有所帮助。 - Theo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接