Kinesis从多个分片获取数据。

4
我正在尝试构建一个简单的应用程序,从AWS Kinesis读取数据。我已经成功使用单个分片读取数据,但是我想从4个不同的分片获取数据。
问题在于,我有一个while循环,只要分片处于活动状态,就会迭代,这会阻止我从不同的分片读取数据。到目前为止,我无法找到替代算法,也无法实现基于KCL的解决方案。非常感谢您的帮助。
public static void DoSomething() {
        AmazonKinesisClient client = new AmazonKinesisClient();
        //noinspection deprecation
        client.setEndpoint(endpoint, serviceName, regionId);  
        /** get shards from the stream using describe stream method*/

        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(streamName);
        List<Shard> shards = new ArrayList<>();
        String exclusiveStartShardId = null;
        do {
            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
            DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
            shards.addAll(describeStreamResult.getStreamDescription().getShards());
            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
            } else {
                exclusiveStartShardId = null;
            }
        }while (exclusiveStartShardId != null);

        /** shards obtained */
        String shardIterator;

        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(streamName);
        getShardIteratorRequest.setShardId(shards.get(0).getShardId());
        getShardIteratorRequest.setShardIteratorType("LATEST"); 

        GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
        shardIterator = getShardIteratorResult.getShardIterator();
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();

        while (!shardIterator.equals(null)) {
            getRecordsRequest.setShardIterator(shardIterator);
            getRecordsRequest.setLimit(250);
            GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
            List<Record> records = getRecordsResult.getRecords();

            shardIterator = getRecordsResult.getNextShardIterator();
            if(records.size()!=0) {
                for(Record r : records) {
                    System.out.println(r.getPartitionKey());
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
2个回答

1
建议您不要从多个分片读取单个进程/工作程序。首先,这会增加代码的复杂性,但更重要的是,您将在扩展时遇到问题。
可扩展性的“秘密”是拥有小型和独立的工作程序或其他类似单位的设计。在Hadoop、DynamoDB或AWS中的Kinesis中,您可以看到这样的设计。它允许您构建小型系统(微服务),根据需要轻松地进行扩展和缩小。随着服务变得更加成功或其使用情况的波动,您可以轻松地添加更多的工作/数据单元。
正如您可以在这些AWS服务中看到的那样,有时您可以自动获得这种可扩展性,例如DynamoDB,有时您需要向kinesis流添加分片。但是对于您的应用程序,您需要以某种方式控制其可扩展性。
在Kinesis的情况下,您可以使用AWS Lambda或Kinesis Client Library(KCL)来进行上下扩展。它们都在侦听流的状态(分片数量和事件)并使用它来添加或删除工作程序并将事件传递给它们进行处理。在这两个解决方案中,您应该构建一个针对单个分片工作的工作程序。
如果您需要对来自多个分片的事件进行对齐,则可以使用一些状态服务,例如Redis或DynamoDB。

问题是我无法读取任何数据,即使我使用工作程序也无法检索碎片。想使用“consumeShard”方法,但不够用。我尝试自定义适用于Kinesis应用程序的AWS示例,但它过于依赖DynamoDB。我不想将我的对象或计数保存到Dynamo表中。 - emrahozkan

0

对于一个简单且更整洁的解决方案,您只需要考虑提供自己的消息处理代码,我建议使用KCL库。

引用自文档

KCL充当您的记录处理逻辑与Kinesis数据流之间的中介。 KCL执行以下任务:

  • 连接到数据流
  • 枚举数据流中的分片
  • 使用租约协调其工作人员与分片的关联
  • 为其管理的每个分片实例化记录处理器
  • 从数据流中拉取数据记录
  • 将记录推送到相应的记录处理器
  • 检查点处理的记录
  • 在工作实例计数更改或数据流重新分区(分片分裂或合并)时平衡分片 -工作程序关联(租赁)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接