AWS Kinesis分析中的应用程序SQL流无法添加行,来自Kinesis数据流。

4
我正在调用dynamoDBTrigger将数据写入Kinesis流。该流被配置为作为Kinesis分析应用程序的输入流。我在Kinesis流上配置了一个Lambda预处理器,记录写入流中的数据。但是,在源选项卡上,分析应用窗口中出现消息源流中没有行。在应用程序 SQL 流中未创建行。
我使用Node并使用serverless.yml文件部署服务。以下是配置 -

RecordKinesisAnalyticsApp: Type: AWS::KinesisAnalytics::Application Properties: ApplicationName: 记录Kinesis Analytics应用 ApplicationDescription: 记录Kinesis Analytics应用 ApplicationCode: ${file(./serverless/metadataQueries.yml):AnalyticsQuery_1} Inputs: - NamePrefix: "记录前缀" InputSchema: RecordColumns: - Name: "USER_ID" SqlType: "VARCHAR(20)" Mapping: "$._userId" - Name: "ANXIETY" SqlType: "INTEGER" Mapping: "$.anxiety" RecordEncoding: "UTF-8" RecordFormat: RecordFormatType: "JSON" KinesisStreamsInput: ResourceARN: Fn::GetAtt: - RecordKinesisInputStream - Arn RoleARN: arn:aws:iam::xxxxxxxxxxx:role/service-role/kinesis-analytics-KinesisDemo-us-east-1 这是分析查询内容 -

创建或替换流 "DESTINATION_SQL_STREAM" (USER_ID VARCHAR(20), ANXIETY INTEGER); 创建或替换泵 "STREAM_PUMP",将数据插入到 "DESTINATION_SQL_STREAM" 中 从 "RecordPrefix_001" 中选择 USER_ID 和 ANXIETY 到流中 其中 ANXIETY >= 0; 在此输入图像描述

enter image description here


2
我猜你已经找到了问题,今天我也遇到了同样的情况,原始记录看起来没问题,在格式化选项卡中也没有信息,在错误流中也没有信息。我也在使用无服务器技术,问题是它保存SQL查询而不运行它,如果你去SQL控制台尝试运行脚本,你会发现错误。希望这能帮助其他人。 - ferflores
1个回答

0

我认为这是因为你的引号。AWS控制台(默认情况下,即使您看不到它)和您在代码中使用了"USER_ID""ANXIETY"等等。因此,您的SQL代码也必须使用引号:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM ("USER_ID" VARCHAR(20), "ANXIETY" INTEGER);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "USER_ID", "ANXIETY"
    FROM "RecordPrefix_001" 
    WHERE "ANXIETY" >= 0;

来自文档

Kinesis Data Analytics在创建应用程序输入流时,会在标识符(流名称和列名称)周围添加引号。当查询此流和列时,您必须使用相同的大小写并将它们用引号括起来(完全匹配小写和大写字母)。有关标识符的更多信息,请参阅Amazon Kinesis Data Analytics SQL参考中的标识符。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接