Inside the PreProcess, batch the output into chunks small enough to not exceed the payload.
This is the most complicated step. You will need to do some experimenting to find the largest size of a single batch. Once you have the number (it may be smart to make this number dynamic), I used numpy to split the original PreProcess output into the number of batches.
import numpy as np
batches = np.array_split(original_pre_process_output, number_of_batches)
Again inside the PreProcess, upload each batch to Amazon S3, saving the keys in a new list. This list of S3 keys will be the new PreProcess output.
In Python, this looks like so:
import json
import boto3
s3 = boto3.resource('s3')
batch_keys = []
for batch in batches:
s3_batch_key = 'Your S3 Key here'
s3.Bucket(YOUR_BUCKET).put_object(Key=s3_batch_key, Body=json.dumps(batch))
batch_keys.append({'batch_key': s3_batch_key})
In the solution I implemented, I used for batch_id, batch in enumerate(batches)
to easily give each S3 key its own ID.
Wrap the 'Inner' Map State in an 'Outer' Map State, and create a Lambda function within the Outer Map to feed the batches to the Inner Map.
Now that we have a small output consisting of S3 keys, we need a way to open one at a time, feeding each batch into the original (now 'Inner') Map state.
To do this, first create a new Lambda function - this will represent the BatchJobs
state. Next, wrap the initial Map state inside an Outer map, like so:
"PreProcess": {
"Type": "Task",
"Resource": "Your Resource ARN",
"Next": "Outer Map"
},
"Outer Map": {
"Type": "Map",
"MaxConcurrency": 1,
"Next": "Original 'Next' used in the Inner map",
"Iterator": {
"StartAt": "BatchJobs",
"States": {
"BatchJobs": {
"Type": "Task",
"Resource": "Newly created Lambda Function ARN",
"Next": "Inner Map"
},
"Inner Map": {
Initial Map State, left as is.
}
}
}
}
Note the 'MaxConcurrency' parameter in the Outer Map - This simply ensures the batches are executed sequentially.
With this new Step Function definition, the BatchJobs
state will receive {'batch_key': s3_batch_key}
, for each batch. The BatchJobs
state then simply needs to get the object stored in the key, and pass it to the Inner Map.
In Python, the BatchJobs
Lambda function looks like so:
import json
import boto3
s3 = boto3.client('s3')
def batch_jobs_handler(event, context):
return json.loads(s3.get_object(Bucket='YOUR_BUCKET_HERE',
Key=event.get('batch_key'))['Body'].read().decode('utf-8'))
Update your workflow to handle the new structure of the output.
Before implementing this solution, your Map state outputs an array of outputs:
[
{Map_output_1},
{Map_output_2},
.
.
.
{Map_output_n}
]
With this solution, you will now get a list of lists, with each inner list containing the results of each batch:
[
[
{Batch_1_output_1},
{Batch_1_output_2},
.
.
.
{Batch_1_output_n}
],
[
{Batch_2_output_1},
{Batch_2_output_2},
.
.
.
{Batch_2_output_n}
],
.
.
.
[
{Batch_n_output_1},
{Batch_n_output_2},
.
.
.
{Batch_n_output_n}
]
]
Depending on your needs, you may need to adjust some code after the Map in order to handle the new format of the output.