8得票2回答
如何验证Amazon Kinesis Python客户端是否正常工作

我正在尝试使用KCL库为Python (https://github.com/awslabs/amazon-kinesis-client-python)构建Amazon Kinesis Python消费者。我开始通过检查示例代码来了解。我能够运行示例代码的生产者和消费者脚本部分,但是我无法验证...

7得票1回答
Spark Streaming保证特定的开始窗口时间

我正在使用Spark Streaming和Structured Streaming框架从Kinesis读取数据,我的连接如下: val kinesis = spark .readStream .format("kinesis") .option("streams", stream...

7得票2回答
如何加入一个Spark直播流,并使用另一个流在其整个生命周期中收集的所有数据?

我有两个Spark流,第一个流中包含与产品相关的数据:供应商价格、货币、描述和供应商ID。这些数据通过分析描述和美元价格猜测类别进行增强。然后将它们保存在一个parquet数据集中。 第二个流包含有关这些产品拍卖的数据,然后是它们被售出的成本和日期。 考虑到一个产品可能今天出现在第一个流中...

8得票2回答
使用Kinesis实现跨账户Lambda触发器

我正在尝试通过来自A账户的Kinesis流触发B账户中的Lambda。这类似于此处所述内容,只是示例中使用的是S3而不是Kinesis。 为了实现这一点,我正在尝试设置正确的权限,但遇到了困难。 首先,我添加了以下权限: aws lambda add-permission \ --f...

7得票2回答
Amazon Kinesis模拟器

我们正在评估实时事件处理引擎(如Twitter风暴),其中一个选项是最近发布的Amazon Kinesis。 我想知道是否有任何仿真器/沙盒环境可用,可以在不需要设置AWS账户和支付服务使用费的情况下玩弄kinesis一段时间。 提前感谢您。

8得票2回答
Apache Flink - 无法使用本地 Kinesis 进行 FlinkKinesisConsumer

目前为止,我已经按照Flink kinesis连接器的文档说明使用本地Kinesis。 使用非AWS Kinesis端点进行测试 Properties producerConfig = new Properties(); producerConfig.put(AWSConfigConsta...

7得票1回答
如何处理AWS Kinesis中的重新处理场景?

我正在探索AWS Kinesis,以替换旧的批处理ETL处理方式,采用基于流的方法进行数据处理。 这个项目的一个关键要求是在以下情况下能够重新处理数据: - 发现并修复了错误,并重新部署应用程序。需要从头开始重新处理数据。 - 添加新功能并需要完全或部分重新处理历史记录。 这些方案在Ka...

10得票1回答
控制生成的future数量以创建反压力

我正在使用由futures-rs提供支持的Rusoto AWS Kinesis库。为了实现高吞吐量,我需要生成一系列AWS Kinesis请求以启动深层管道,因为Kinesis每个HTTP请求的记录限制为500条。加上发送请求的50ms延迟,我需要开始生成许多并发请求。我希望创建大约100个正...

18得票4回答
Kinesis Firehose将JSON对象放入S3中,且没有分隔符逗号

在发送数据之前,我使用JSON.stringify将数据转换成以下格式{"data": [{"key1": value1, "key2": value2}, {"key1": value1, "key2": value2}]} 但是一旦它通过AWS API Gateway并且Kinesis F...

7得票1回答
使用Java SDK将记录放入Kinesis流时,sequenceNumberForOrdering的作用是什么?

我对AWS文档中关于将记录放入Kinesis流的部分感到有些困惑,链接如下: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax 它说设置...