我希望能够将一个输入的 AWS Kinesis 流分发/链接/复制到 N 个新的 Kinesis 流中,以便每个写入输入 Kinesis 的记录都会出现在这 N 个流中。
是否有 AWS 服务或开源解决方案可用?
如果有现成的解决方案,我更倾向于不编写代码来实现。AWS Kinesis firehose 不是一个解决方案,因为它无法输出到 Kinesis。也许可以使用 AWS Lambda 解决方案,如果运行成本不会太高的话?
我希望能够将一个输入的 AWS Kinesis 流分发/链接/复制到 N 个新的 Kinesis 流中,以便每个写入输入 Kinesis 的记录都会出现在这 N 个流中。
是否有 AWS 服务或开源解决方案可用?
如果有现成的解决方案,我更倾向于不编写代码来实现。AWS Kinesis firehose 不是一个解决方案,因为它无法输出到 Kinesis。也许可以使用 AWS Lambda 解决方案,如果运行成本不会太高的话?
有两种方法可以实现 Amazon Kinesis 流的扩散(fan-out):
选项 1:使用 Amazon Kinesis Analytics 进行扩散
您可以使用Amazon Kinesis Analytics从现有流生成新流。
来自Amazon Kinesis Analytics 文档:
Amazon Kinesis Analytics 应用程序不断地实时读取和处理流数据。您使用 SQL 编写应用程序代码来处理传入的流数据并生成输出。然后,Amazon Kinesis Analytics 将输出写入配置的目标。
在应用程序代码部分中提到了扩散(fan-out):
您还可以编写彼此独立的 SQL 查询。例如,您可以编写两个 SQL 语句来查询同一应用程序流,但将输出发送到不同的应用程序流。
我成功实现了以下操作:
Amazon Kinesis Analytics SQL 应用程序如下:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));
CREATE OR REPLACE PUMP "COPY_PUMP1" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
这段代码创建了一个泵(将其视为连续的选择语句),从input
流中进行选择并输出到output1
流。我创建了另一个完全相同的应用程序,它输出到output2
流。
为了测试,我向input
流发送了数据:
#!/usr/bin/env python
import json, time
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
i = 0
while True:
data={}
data['log'] = 'Record ' + str(i)
i += 1
print data
kinesis.put_record("input", json.dumps(data), "key")
time.sleep(2)
我让它运行一段时间,然后使用这段代码显示输出:
from boto import kinesis
kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]
输出结果为:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']
我再次对output2
运行它,结果显示完全相同。
选项二:使用 AWS Lambda
如果你需要向多个流传输数据,一种更有效的方法可能是创建一个 AWS Lambda 函数:
你甚至可以让 Lambda 函数基于命名约定(例如任何以 app-output-*
命名的流)自动发现输出流。
有两个AWS本地解决方案可以扇出Kinesis流,不需要AWS Firehose或AWS Lambda。