如何扇出 AWS Kinesis 流?

20

我希望能够将一个输入的 AWS Kinesis 流分发/链接/复制到 N 个新的 Kinesis 流中,以便每个写入输入 Kinesis 的记录都会出现在这 N 个流中。

是否有 AWS 服务或开源解决方案可用?

如果有现成的解决方案,我更倾向于不编写代码来实现。AWS Kinesis firehose 不是一个解决方案,因为它无法输出到 Kinesis。也许可以使用 AWS Lambda 解决方案,如果运行成本不会太高的话?


2
你为什么觉得需要进行扇出操作呢?Kinesis 流已经支持多个消费者从流的不同部分读取数据了。 - E.J. Brennan
正如 @E.J.Brennan 提到的那样,你为什么需要 fan out? - ketan vijayvargiya
3
@E.J.Brennan,Kinesis确实支持多个消费者,但全局读取限制为每秒5次。虽然每次读取可以拉取许多记录,但一旦你有超过20个消费者,你的延迟时间将会变成> 4秒,这对于我的应用程序来说是不可接受的。详情请见:https://brandur.org/kinesis-in-production#five-reads - Gili Nachum
3个回答

19

有两种方法可以实现 Amazon Kinesis 流的扩散(fan-out):

  • 使用 Amazon Kinesis Analytics 将记录复制到其他流中。
  • 触发 AWS Lambda 函数将记录复制到另一个流中。

选项 1:使用 Amazon Kinesis Analytics 进行扩散

您可以使用Amazon Kinesis Analytics从现有流生成新流。

来自Amazon Kinesis Analytics 文档

Amazon Kinesis Analytics 应用程序不断地实时读取和处理流数据。您使用 SQL 编写应用程序代码来处理传入的流数据并生成输出。然后,Amazon Kinesis Analytics 将输出写入配置的目标

Amazon Kinesis Analytics flow diagram

应用程序代码部分中提到了扩散(fan-out):

您还可以编写彼此独立的 SQL 查询。例如,您可以编写两个 SQL 语句来查询同一应用程序流,但将输出发送到不同的应用程序流

我成功实现了以下操作:

  • 创建了三个流:输入、输出1、输出2
  • 创建了两个 Amazon Kinesis Analytics 应用程序:copy1、copy2

Amazon Kinesis Analytics SQL 应用程序如下:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

这段代码创建了一个(将其视为连续的选择语句),从input流中进行选择并输出到output1流。我创建了另一个完全相同的应用程序,它输出到output2流。

为了测试,我向input流发送了数据:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

我让它运行一段时间,然后使用这段代码显示输出:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

输出结果为:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

我再次对output2运行它,结果显示完全相同。

选项二:使用 AWS Lambda

如果你需要向多个流传输数据,一种更有效的方法可能是创建一个 AWS Lambda 函数:

  • 由 Amazon Kinesis 流记录触发
  • 将记录写入多个 Amazon Kinesis“输出”流的函数

你甚至可以让 Lambda 函数基于命名约定(例如任何以 app-output-* 命名的流)自动发现输出流。


2
您也可以使用相同的 Kinesis Analytics 应用程序并向其添加两个输出流 :) - alexcasalboni

2

1

有两个AWS本地解决方案可以扇出Kinesis流,不需要AWS Firehose或AWS Lambda。

  1. 与Kafka消费者组类似,Kinesis有应用程序名称。每个流的消费者可以提供唯一的应用程序名称。如果两个消费者具有相同的应用程序名称,则消息在它们之间分配。要扇出流,请为那些您希望从流中接收相同消息的消费者提供不同的应用程序名称。 Kinesis将在幕后创建新的DynamoDB表,以跟踪每个新应用程序的每个消费者,以便他们可以以不同的速率消耗消息等。
  2. 使用Kinesis Enhanced Fan-Out以实现更高的吞吐量(每秒最多2MiB),并且这不计入全局读取限制。撰写本文时,每个流有20个“增强型扇出”消费者的限制。
据我所知,这两个选项的一个注意事项是您需要使用Kinesis客户端库(KCL)(而不是原始的AWS SDK)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接