通过AWS API在Glue表上添加分区?

20

我有一个S3桶,里面不断地填充新数据。我正在使用Athena和Glue查询该数据,但问题是如果Glue不知道新分区被创建了,就无法搜索需要搜索的内容。如果每次需要新的分区时都要进行API调用来运行Glue爬虫,那么成本太高了,因此最好的解决方案是告诉Glue添加了一个新分区,即在其属性表中创建一个新分区。我查阅了AWS文档,但没有找到相关信息。我正在使用Java与AWS。有什么帮助吗?


从计算或金钱的角度来看,是“太贵了”? - botchniaque
从经济角度来看,使用CPU并不是一项难以操作的任务。 - Gudzo
如果您知道何时添加新分区,请尝试我的答案中的第3个选项。 - botchniaque
@Gudzo - 如果我的回答有帮助,您可以接受它吗? - conetfun
@Gudzo 你好?我来看看你是否接受了我的解决方案。 - conetfun
4个回答

37

您可以使用batch_create_partition() Glue API来注册新分区。它不需要像MSCK REPAIR TABLE或重新爬取一样的昂贵操作。

我有一个类似的用例,为此我编写了一个Python脚本,执行以下操作 -

步骤1-获取表信息并从中解析所需的信息,以便注册分区。

# Fetching table information from glue catalog
logger.info("Fetching table info for {}.{}".format(l_database, l_table))
try:
    response = l_client.get_table(
        CatalogId=l_catalog_id,
        DatabaseName=l_database,
        Name=l_table
    )
except Exception as error:
    logger.error("Exception while fetching table info for {}.{} - {}"
                 .format(l_database, l_table, error))
    sys.exit(-1)

# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']

步骤2 - 生成一个字典的列表,其中每个列表包含创建单个分区所需的信息。所有列表具有相同的结构,但它们特定于分区的值将会更改(年,月,日,小时)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from {} to {}".format(start_date, end_date))

    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str('{:02d}'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "{}{}/{}/{}/{}/".format(table_location, year, month, day, hour)
            input_dict = {
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': {
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                }
            }
            input_list.append(input_dict.copy())
    return input_list

第三步 - 调用batch_create_partition() API

for each_input in break_list_into_chunks(partition_input_list, 100):
    create_partition_response = client.batch_create_partition(
        CatalogId=catalog_id,
        DatabaseName=l_database,
        TableName=l_table,
        PartitionInputList=each_input
    )

单个API调用最多只能创建100个分区,因此如果您要创建的分区数量超过100个,则需要将列表分成块并逐个迭代。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition


嘿@conetfun,我尝试查找单个API中分区数量的限制。是否有官方文档提到了这一点? - spectre009

11
  1. You can configure you're glue crawler to get triggered every 5 mins

  2. You can create a lambda function which will either run on schedule, or will be triggered by an event from your bucket (eg. putObject event) and that function could call athena to discover partitions:

     import boto3
    
     athena = boto3.client('athena')
    
     def lambda_handler(event, context):
         athena.start_query_execution(
             QueryString = "MSCK REPAIR TABLE mytable",
             ResultConfiguration = {
                 'OutputLocation': "s3://some-bucket/_athena_results"
             }
    
  3. Use Athena to add partitions manualy. You can also run sql queries via API like in my lambda example.

    Example from Athena manual:

     ALTER TABLE orders ADD
       PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
       PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    

你可以配置你的Glue目录,使其每5分钟触发一次。这是什么意思? - Mário de Sá Vera
1
哦,抱歉 - 我的意思是写“_您可以配置您的Glue爬虫程序每5分钟触发一次_”。我会更新我的答案。 - botchniaque

6
这个问题早已经存在,但我想提出一个观点:当S3中有新数据到达时,可以使用s3:ObjectCreated:Put通知来触发Lambda函数,从而注册新分区。我甚至会扩展此功能以处理基于对象删除等的废弃情况。这是AWS的一篇博客文章,详细介绍了S3事件通知:https://aws.amazon.com/blogs/aws/s3-event-notification/

3

AWS Glue最近新增了RecrawlPolicy功能,该功能只会抓取您添加到S3存储桶中的新文件夹/分区。

https://docs.aws.amazon.com/glue/latest/dg/incremental-crawls.html

这有助于最小化重复抓取所有数据。根据我的阅读,您可以在设置爬虫时定义增量抓取或编辑现有爬虫。但需要注意的一点是,增量抓取要求新数据的模式与现有模式基本相同。


这对于一些非常简单的模式可能有效...但是对于更复杂的模式,请忘记爬虫并采用上面列出的策略。 - Mário de Sá Vera

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接