加入 Kinesis 流

4

我有两个 Kinesis 流,我想创建一个第三个流,它是这两个流的交集。我的目标是让流处理器响应第三个流上的事件,而无需编写执行此交集的消费者。

a 上的记录将是:

{
    "customer_id": 3,
    "first_name":"Marcy",
    "last_name":"Shurtleff"
}

流 b 上的记录将会是:

{
    "payment_id": 10001,
    "customer_id": 1,
    "amount":234.56,
    "date":"2018-09-07T10:25:43.511Z"

我希望执行一个类似于Kafka中KSQL的连接操作,将流 a.customer_id 与流 b.customer_id 连接起来,得到以下结果:

{
    "customer_id": 3,
    "first_name":"Marcy",
    "last_name":"Shurtleff",
    "payment_id": 10001,
    "amount":234.56,
    "date":"2018-09-07T10:25:43.511Z"
}

(或者我选择的任何类似SQL的投影)。
我知道使用Kafka和KSQL是可能的,但在Kinesis中可行吗?
由于Kinesis数据分析产品不能在一个以上的流中作为数据源使用,并且您只能在“应用程序内”流上执行连接,因此Kinesis不起作用。

这在Spark和Drools中也是可能的,但不幸的是,在Kinesys Analytics中不行。 - Konstantin Triger
1个回答

1
我最近使用Kinesis Data Analytics成功实现了你所要求的解决方案。KDA应用仅接受一个数据流作为输入源,因此当你处理多个数据流时,需要对流入KDA的数据进行模式标准化。为了解决这些问题,可以在Lambda函数内部使用Python代码片段来展开和标准化任何事件,将其整个有效载荷转换为JSON编码字符串。下图展示了我的整个解决方案的部署情况:enter image description here 以下是详细说明标准化和展平流程的过程: enter image description here 请注意,在此阶段后,两个JSON事件具有相同的模式且没有嵌套字段。但是,所有信息都被保留下来。此外,ssn字段被放置在标题中,以作为在KDA应用程序内部使用的连接键。
如需更多关于该解决方案的信息,请查看我撰写的文章:https://medium.com/@guilhermeepassos/joining-and-enriching-multiple-sets-of-streaming-data-with-kinesis-data-analytics-24b4088b5846

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接