如何从DynamoDB流的新图像中获取纯Json字符串?

17

我有一个启用了流式处理的Dynamodb表,并为该表创建了一个触发器,调用一个AWS Lambda函数。在lambda函数内部,我试图从Dynamodb流中读取新图像(修改后的Dynamodb项),并尝试获取其中的纯json字符串。我的问题是如何获得通过流发送的DynamoDB项的纯json字符串?我正在使用下面给出的代码片段获取新图像,但我不知道如何从中获取json字符串。感谢您的帮助。

public class LambdaFunctionHandler implements RequestHandler<DynamodbEvent, Object> {

@Override
public Object handleRequest(DynamodbEvent input, Context context) {
    context.getLogger().log("Input: " + input);

    for (DynamodbStreamRecord record : input.getRecords()){

        context.getLogger().log(record.getEventID());
        context.getLogger().log(record.getEventName());
        context.getLogger().log(record.getDynamodb().toString());
        Map<String,AttributeValue> currentRecord = record.getDynamodb().getNewImage();

        //how to get the pure json string of the new image
        //..............................................
     }
     return "Successfully processed " + input.getRecords().size() + " records.";
}

}


我的真正意图是在Dynamodb中插入/更新/删除项目时将数据发送到Elasticsearch服务器。我使用AWS用户控制台上提供的Python Lambda代码模板(创建新Lambda函数部分)成功实现了这一点。因此,我无需担心从Dynamodb流事件中获取纯JSON字符串,就可以直接从Dynamodb流将数据发送到Amazon Elasticsearch服务,使用上述代码模板。希望这对某些人有所帮助。 - Asanga Dewaguru
8个回答

9
下面是将Dynamo JSON转换为标准JSON的完整代码:
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.internal.InternalUtils;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Main Lambda class to receive event stream, parse it to Survey
 * and process them.
 */
public class SurveyEventProcessor implements
        RequestHandler<DynamodbEvent, String> {

    private static final String INSERT = "INSERT";

    private static final String MODIFY = "MODIFY";

    public String handleRequest(DynamodbEvent ddbEvent, Context context) {

        List<Item> listOfItem = new ArrayList<>();
        List<Map<String, AttributeValue>> listOfMaps = null;
        for (DynamodbStreamRecord record : ddbEvent.getRecords()) {

            if (INSERT.equals(record.getEventName()) || MODIFY.equals(record.getEventName())) {
                listOfMaps = new ArrayList<Map<String, AttributeValue>>();
                listOfMaps.add(record.getDynamodb().getNewImage());
                listOfItem = InternalUtils.toItemList(listOfMaps);
            }

            System.out.println(listOfItem);
            try {
               // String json = new ObjectMapper().writeValueAsString(listOfItem.get(0));
                Gson gson = new Gson();
                Item item = listOfItem.get(0);

                String json = gson.toJson(item.asMap());
                System.out.println("JSON is ");
                System.out.println(json);
            }catch (Exception e){
                e.printStackTrace();
            }
        }


        return "Successfully processed " + ddbEvent.getRecords().size() + " records.";
    }
} 

我遇到了一个错误,DynamodbEvent返回的是com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue,而util期望的是com.amazonaws.services.dynamodbv2.model.AttributeValue。我正在使用aws-lambda-java-events:jar:3.7.0和aws-java-sdk-dynamodb:jar:1.11.978。请问@Himanshu Parmar,你知道哪些版本是兼容的吗? - anuj

4

在c#中,您可以使用DynamoDB Document类将newImage转换为纯json格式。

使用Amazon.DynamoDBv2.DocumentModel;

var streamRecord = dynamoEvent.Records.First();

var jsonResult = Document.FromAttributeMap(streamRecord.Dynamodb.NewImage).ToJson();


如果您希望进一步将json转换为对象,则可以使用Newtonsoft。

使用Newtonsoft.Json;

TModel model = JsonConvert.DeserializeObject(jsonResult);


3
这是一个Java标记的问题,所以这个C#答案在这种情况下是不相关的。 - JNYRanger

3

我找到了一种干净的方法。使用aws-java-sdk-dynamodb-1.11.15.jar中的InternalUtils。

com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject();
            // get order ready //
            OrderFinal order = Utils.mapO2Object(
                    InternalUtils.toSimpleMapValue(streamRecord.getDynamodb().getNewImage().get("document").getM()), 
                    OrderFinal.class );

你能提供更多关于这个的细节吗? - Marckaraujo
不理解这个评论。你想要什么详细信息? - shailender arya
2
Utils.mapO2Object 是从哪里来的?这是你自己实现的吗? - EagleBeak
Utils.map2Object是我们的实用函数,它是jackson convertValue函数的包装器。请在下面找到该函数。public static <T> T map2Object(Map<String, String> map, Class<T> clazz) { Object t = null; t = OBJECT_MAPPER.convertValue(map, clazz); return (T) t; } - shailender arya

3

以下是Himanshu Parmar的回答总结:

Map<String, AttributeValue> newImage = record.getDynamodb().getNewImage();
List<Map<String, AttributeValue>> listOfMaps = new ArrayList<Map<String, AttributeValue>>();
listOfMaps.add(newImage);
List<Item> itemList = ItemUtils.toItemList(listOfMaps);
for (Item item : itemList) {
    String json = item.toJSON();
}

0
以下是将DynamoDB JSON转换为普通JSON的方法:

/**
 * Converts DynamoDB JSON to normal JSON.
 *
 * @param map Input map of String to AttributeValue.
 * @return Returns an ObjectNode containing the normal JSON.
 */

public JsonObject toJsonObject(final Map<String, AttributeValue> map) {
        final JsonNode result = mapToJsonObject(map);
        final ObjectNode objectNode = (ObjectNode) result;
        final ObjectMapper objectMapper = new ObjectMapper();
        String recordObjectString;
        try {
            recordObjectString = objectMapper.writeValueAsString(objectNode);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        final JsonParser jsonParser = new JsonParser();
        final JsonObject jsonObject = jsonParser.parse(recordObjectString)
                                                .getAsJsonObject();
        return jsonObject;
    }

所以在你的情况下,只需调用以下方法:

// here record is of type DynamodbStreamRecord

toJsonObject(record.getDynamodb().getNewImage());


0

对于那些遇到了Map<String, ?>对象只是普通的Map而非属性值的人,可以尝试以下方法:

Map<String, AttributeValue> dynamoDbAttributes = 
    objectMapper.convertValue(dynamoDbMap, new TypeReference<Map<String, AttributeValue>>() {});

然后将此 DynamoDB Map 转换为普通 Map(与最初推送到 DynamoDb 的 json 等效):

asMap = InternalUtils.toSimpleMapValue(dynamoDbAttributes);

0

对于遇到AttributeValue转换问题的人,请参考以下代码: https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-events-sdk-transformer/README.md

Map<String, AttributeValue> stringAttributeValueMap = DynamodbAttributeValueTransformer.toAttributeValueMapV1(dynamodb.getNewImage());

            List stringAttributeValueMapList = new ArrayList();
            stringAttributeValueMapList.add(stringAttributeValueMap);
            List<Item> listOfItem = InternalUtils.toItemList(stringAttributeValueMapList);
            Gson gson = new GsonBuilder().setPrettyPrinting().create();
            String updatedJSON = gson.toJson(listOfItem.get(0).asMap());

-4

这个库可以完成任务:dynamoDb-marshaler

var unmarshalJson = require('dynamodb-marshaler').unmarshalJson;

console.log('jsonItem Record: %j', unmarshalJson(record.dynamodb.NewImage));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接