AWS步函数mapState如何迭代大型负载?

7

我有一个状态机,由第一个预处理任务生成一个数组作为输出,该数组被后续的 map 状态循环使用。第一个任务的输出数组变得太大,导致状态机抛出错误States.DataLimitExceeded状态/任务 'arn:aws:lambda:XYZ' 返回了一个大小超过最大字符数服务限制的结果。

这里是状态机 yaml 的一个示例:

stateMachines:
  myStateMachine:
    name: "myStateMachine"
    definition:
      StartAt: preProcess
      States:
        preProcess:
          Type: Task
          Resource:
            Fn::GetAtt: [preProcessLambda, Arn]
          Next: mapState
          ResultPath: "$.preProcessOutput"
        mapState:
          Type: Map
          ItemsPath: "$.preProcessOutput.data"
          MaxConcurrency: 100
          Iterator:
            StartAt: doMap
            States:
              doMap:
                Type: Task
                Resource:
                  Fn::GetAtt: [doMapLambda, Arn]
                End: true
          Next: ### next steps, not relevant

我想到的一个可能的解决方案是,状态preProcess将其输出保存在S3存储桶中,状态mapState直接从中读取。这个可行吗?目前preProcess的输出为:

ResultPath: "$.preProcessOutput"

mapState以以下数组作为输入:

ItemsPath: "$.preProcessOutput.data"

我需要如何调整yaml文件,让map状态直接从S3中读取?


你有想到解决方案吗?我也遇到了这个问题。 - amcdnl
6个回答

3

我目前在工作中也正在解决类似的问题。由于步函数存储其整个状态,因此随着JSON映射到所有值而增长,您可能很快就会遇到问题。

唯一真正解决这个问题的方法是使用层次结构的步函数。也就是说,步函数内嵌步函数。所以你有:

父级 -> [批处理1,批处理2,批处理...N]

然后每个批次都有许多单独的作业:

批1 -> [j1,j2,j3...jBATCHSIZE]

我有一个非常简单的步函数,我发现在达到约4k之前是可以拥有最大批处理大小的,否则会开始出现状态限制。

虽然不是很美观的解决方案,但它却有效。


问题在于当有一个普通的父级产生许多短元素时,我不知道如何从单个步骤中流出多个输出。如果我将许多元素放入S3中,Map迭代器怎么会知道呢? - yucer
你不能直接从s3进入地图迭代器。你需要先有一个lambda函数,它从s3中读取数据,然后创建你的批次/步骤函数。每个批次的json输入将是前一步数组元素的子集,或者如果状态太大,则为数组的起始和结束索引。你可能会发现ETL工具更适合,这取决于所有细节。 - Derrops
这样,您至少需要返回存储桶键列表,但如果存储桶列表太大怎么办?返回索引也有问题,因为每次执行都需要加载整个列表并进行切片。 - yucer
这是一个很好的观点,你需要将数组分区保存,即以1k元素批次保存。如果您无法更改此行为,则还可以获取s3项目的字节范围,但我认为那会很麻烦。 - Derrops

1

写下这篇文章是为了帮助其他人解决同样的问题-最近我在工作中也需要解决这个问题。我找到了一个相对简单的解决方案,不需要使用第二个步骤函数。

我正在使用Python,并将提供一些Python示例,但该解决方案应适用于任何语言。

假设预处理输出如下:

[
    {Output_1},
    {Output_2},
    .
    .
    .
    {Output_n}
]

Step Function 的简化版本定义如下:

"PreProcess": {
    "Type": "Task",
    "Resource": "Your Resource ARN",
    "Next": "Map State"
},
"Map State": {
    Do a bunch of stuff
}

为了处理 PreProcess 输出超过 Step Functions 负载的情况:

  1. Inside the PreProcess, batch the output into chunks small enough to not exceed the payload.

    This is the most complicated step. You will need to do some experimenting to find the largest size of a single batch. Once you have the number (it may be smart to make this number dynamic), I used numpy to split the original PreProcess output into the number of batches.

    import numpy as np
    batches = np.array_split(original_pre_process_output, number_of_batches)
    
  2. Again inside the PreProcess, upload each batch to Amazon S3, saving the keys in a new list. This list of S3 keys will be the new PreProcess output.

    In Python, this looks like so:

    import json
    import boto3
    
    s3 = boto3.resource('s3')
    
    batch_keys = []
    for batch in batches:
        s3_batch_key = 'Your S3 Key here'
        s3.Bucket(YOUR_BUCKET).put_object(Key=s3_batch_key, Body=json.dumps(batch))
        batch_keys.append({'batch_key': s3_batch_key})
    

    In the solution I implemented, I used for batch_id, batch in enumerate(batches) to easily give each S3 key its own ID.

  3. Wrap the 'Inner' Map State in an 'Outer' Map State, and create a Lambda function within the Outer Map to feed the batches to the Inner Map.

    Now that we have a small output consisting of S3 keys, we need a way to open one at a time, feeding each batch into the original (now 'Inner') Map state.

    To do this, first create a new Lambda function - this will represent the BatchJobs state. Next, wrap the initial Map state inside an Outer map, like so:

    "PreProcess": {
    "Type": "Task",
    "Resource": "Your Resource ARN",
    "Next": "Outer Map"
    },
    "Outer Map": {
        "Type": "Map",
        "MaxConcurrency": 1,
        "Next": "Original 'Next' used in the Inner map",
        "Iterator": {
            "StartAt": "BatchJobs",
            "States": {
                "BatchJobs": {
                    "Type": "Task",
                    "Resource": "Newly created Lambda Function ARN",
                    "Next": "Inner Map"   
                },
                "Inner Map": {
                     Initial Map State, left as is.
                }
            }
        }
    }
    

    Note the 'MaxConcurrency' parameter in the Outer Map - This simply ensures the batches are executed sequentially.

    With this new Step Function definition, the BatchJobs state will receive {'batch_key': s3_batch_key}, for each batch. The BatchJobs state then simply needs to get the object stored in the key, and pass it to the Inner Map.

    In Python, the BatchJobs Lambda function looks like so:

    import json
    import boto3
    
    s3 = boto3.client('s3')
    
    def batch_jobs_handler(event, context):
        return json.loads(s3.get_object(Bucket='YOUR_BUCKET_HERE',
                                        Key=event.get('batch_key'))['Body'].read().decode('utf-8'))
    
  4. Update your workflow to handle the new structure of the output.

    Before implementing this solution, your Map state outputs an array of outputs:

    [
        {Map_output_1},
        {Map_output_2},
        .
        .
        .
        {Map_output_n}
    ]
    

    With this solution, you will now get a list of lists, with each inner list containing the results of each batch:

    [
        [
            {Batch_1_output_1},
            {Batch_1_output_2},
            .
            .
            .
            {Batch_1_output_n}
        ],
        [
            {Batch_2_output_1},
            {Batch_2_output_2},
            .
            .
            .
            {Batch_2_output_n}
        ],
        .
        .
        .
        [
            {Batch_n_output_1},
            {Batch_n_output_2},
            .
            .
            .
            {Batch_n_output_n}
        ]
    ]
    

    Depending on your needs, you may need to adjust some code after the Map in order to handle the new format of the output.

就是这样!只要您正确设置最大批处理大小,如果S3密钥列表超过有效载荷限制,唯一会遇到有效载荷限制的方法就是。


如果您返回的列表超过了256KB怎么办?假设您的键只是带有json扩展名的uuid:256*1024/len(str(uuid.uuid4())+'.json') = 6393。难道您不认为在某些用例中,预处理状态可能会返回6400个项目吗? - yucer
这个新的列表列表绝对可以超过256KB的有效载荷限制。不过,在这种情况下,您可以应用我上面描述的相同模式。您需要将列表列表分批处理成列表列表列表,并使用另一个映射状态/lambda函数来处理它并传递它,但这将是完全相同的模式。只是再加一层而已。 - ratiugo

1
我认为目前不可能直接从S3读取。有几个方法可以尝试解决此限制。一个是创建自己的迭代器并不使用Map State。另一个方法如下:
让Lambda读取您的S3文件,并按索引或某个ID/键进行分块。这一步的想法是将迭代器传递给Map State,以获得更小的有效载荷。假设您的数据具有以下结构。
[ { idx: 1, ...more keys }, {idx: 2, ...more keys }, { idx: 3, ...more keys }, ... 4,997 more objects of data ]
假设您希望迭代器每次处理1,000行。然后从您的Lambda返回表示索引的以下元组:[ [ 0, 999 ], [ 1000, 1999 ], [ 2000, 2999 ], [ 3000, 3999 ], [ 4000, 4999] ]
您的Map State将接收此新数据结构,每次迭代都是元组之一。迭代#1:[ 0, 999 ],迭代#2:[ 1000, 1999 ]等等
在您的迭代器内部,调用一个lambda表达式,使用元组索引查询您的S3文件。AWS在S3存储桶上有一个查询语言,称为Amazon S3 Select:https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-select.html 这是另一个关于如何使用S3 select并使用node将数据转换为可读状态的良好资源:https://thetrevorharmon.com/blog/how-to-use-s3-select-to-query-json-in-node-js 因此,在迭代#1中,我们正在查询我们数据结构中的前1,000个对象。现在,我可以调用我通常在迭代器内部调用的任何函数。
这种方法的关键在于inputPath从未接收到大型数据结构。

1
请注意(SelectObjectContent)在支持的集成中列出了不支持的 API 操作(https://docs.aws.amazon.com/step-functions/latest/dg/supported-services-awssdk.html#unsupported-api-actions-list)。这意味着,S3-Select 不会将所有结果记录添加到工作流中。如果您从 Lambda 函数查询它,则结果列表可能会超过有效负载限制。需要某种方式让 Lambda 函数可以从单个执行中流式传输多个输出。 - yucer

1

在访问您提供的链接后,我发现他们已经认识到了这个限制,并提出了动态并行模型以缓解该问题(https://aws.amazon.com/blogs/compute/introducing-larger-state-payloads-for-aws-step-functions/)。然而,如果“检索项目”步骤的输出是超过限制的长列表,并且下一个Map步骤需要迭代该列表,则如何克服此限制仍不清楚。 - yucer

1

0
提出的解决方法适用于特定情况,但在处理正常有效负载可能生成超过有效负载限制的大量项目的情况下,不适用。
从一般形式上讲,我认为问题可能会在1->N的场景中重复。我的意思是,当一个步骤可能在工作流中生成多个步骤执行时。
将某些任务的复杂性分解成许多其他任务是打破复杂性的明显方法之一,因此这很可能需要很多次。从可扩展性的角度来看,这样做有明显的优势,因为您将大型计算分解成小型计算的越多,就会有更多的粒度和并行性,并且可以进行更多的优化。
这就是AWS通过增加最大有效负载大小来促进的内容。他们称其为动态并行性
问题在于Map状态是其中的基石。除了服务集成(数据库查询等)之外,它是唯一可以从一个步骤动态派生多个任务的状态。但似乎没有办法指定它的有效负载在文件中。
我认为解决这个问题的一个快速方法是在每个步骤中添加一个可选的持久化规范,例如:
stateMachines:
  myStateMachine:
    name: "myStateMachine"
    definition:
      StartAt: preProcess
      States:
        preProcess:
          Type: Task
          Resource:
            Fn::GetAtt: [preProcessLambda, Arn]
          Next: mapState
          ResultPath: "$.preProcessOutput"
          OutputFormat:
             S3:
                Bucket: myBucket
             Compression:
                Format: gzip
        mapState:
          Type: Map
          ItemsPath: "$.preProcessOutput.data"
          InputFormat:
             S3:
                Bucket: myBucket
             Compression:
                Format: gzip
          MaxConcurrency: 100
          Iterator:
            StartAt: doMap
            States:
              doMap:
                Type: Task
                Resource:
                  Fn::GetAtt: [doMapLambda, Arn]
                End: true
          Next: ### next steps, not relevant

这样地图甚至可以在大负载下执行其工作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接