AWS Kinesis .NET Consumer

6

我正在使用AWS Kinesis进行生产者和消费者实验,但遇到的问题是尽管我们多次更改发送的数据对象,消费者仍然接收第一条消息(或记录)。此外,我们尝试了多种ShardIteratorType,但都没有起作用。最新的类型不产生任何结果,而其他所有类型仍会产生相同的原始记录。

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Internal;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using BenchmarkRuleSetModel.Models;
using MongoDB.Driver;
using Newtonsoft.Json;

namespace ConsoleApp7
{
    internal class Program
    {
        private static AmazonKinesisClient _client;
        private static string _streamName;

        static async Task ReadFromStream()
        {
            var kinesisStreamName = _streamName;

            var describeRequest = new DescribeStreamRequest
            {
                StreamName = kinesisStreamName,
            };

            var describeResponse = await _client.DescribeStreamAsync(describeRequest);
            var shards = describeResponse.StreamDescription.Shards;

            foreach (var shard in shards)
            {
                var iteratorRequest = new GetShardIteratorRequest
                {
                    StreamName = kinesisStreamName,
                    ShardId = shard.ShardId,
                    ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
                    Timestamp = DateTime.MinValue
                };

                var iteratorResponse = await _client.GetShardIteratorAsync(iteratorRequest);
                var iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    var getRequest = new GetRecordsRequest
                    {
                        ShardIterator = iteratorId, Limit = 10000
                    };

                    var getResponse = await _client.GetRecordsAsync(getRequest);
                    var nextIterator = getResponse.NextShardIterator;
                    var records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (var record in records)
                        {
                            var json = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("Json string: " + json);
                        }
                    }

                    iteratorId = nextIterator;
                }
            }
        }

        private static async Task<string> Produce()
        {
            var data = new
            {
                Message = "Hello world!",
                Author = "Amir"
            };

            //convert to byte array in prep for adding to stream
            var oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

            using (var ms = new MemoryStream(oByte))
            {
                //create put request
                var requestRecord = new PutRecordRequest
                {
                    StreamName = _streamName,
                    PartitionKey = Guid.NewGuid().ToString(),
                    Data = ms
                };
                //list name of Kinesis stream
                //give partition key that is used to place record in particular shard
                //add record as memorystream

                //PUT the record to Kinesis
                var response = await _client.PutRecordAsync(requestRecord);

                return response.SequenceNumber;
            }
        }

        static void Main(string[] args)
        {
            _client = new AmazonKinesisClient("ExampleKey", "ExampleSecret", RegionEndpoint.EUWest2);

            _streamName = "SomeStream";

            Produce().Wait();

            ReadFromStream().Wait();
        }
    }
}

QQ。是否有其他应用程序将记录写入流中?从您的上述代码来看,似乎您只是放置了一次记录并完成了该操作。 - Imran
@Imran 我只是想生产和消费它,以确保我理解它。所以回答你的问题,不。 - Node.JS
1个回答

4
首先,我已经调试了你的代码,发现它在内部循环(while (!string.IsNullOrEmpty(iteratorId)))中无限循环,并且从未遍历流中所有的分片(假设你有 >1 的分片)。原因在于https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty中有解释——由于生产者从未调用MergeShardsSplitShards,它们仍然开放,因此NextShardIterator将永远不会是NULL
这就是为什么你只能看到记录放在第一个分片上的原因(或者至少我在运行你的代码时是这样的)——你必须并行地从分片中读取。
至于你的使用模式,你正在使用:
ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue

通过这种方式,你实际上是在告诉Kinesis“从时间的开始给我流中的所有记录”(或者至少是保留期所及的范围)。这就是为什么除了新记录之外,你仍然看到相同的旧记录(当我运行你的代码时,我也看到了这个情况)。 GetRecords[Async]调用实际上并不从流中删除记录(见https://dev59.com/Xl8e5IYBdhLWcg3wssLG#25741304)。使用Kinesis的正确方式是从检查点到检查点移动。如果消费者持久化了上次读取的SequenceNumber,然后像这样重新启动:
ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber

那么你将只看到更新的记录。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接