使用AWS Java DynamoDB流Kinesis适配器处理DynamoDB流数据

12

我正在尝试使用DynamoDB流和AWS提供的Java DynamoDB流Kinesis适配器捕获DynamoDB表更改。我正在使用Scala应用程序中的AWS Java SDK。

我开始按照AWS指南 和通过AWS发布的代码示例进行操作。但是,我在我的环境中无法让Amazon自己发布的代码正常工作。我的问题出现在KinesisClientLibConfiguration对象上。

在示例代码中,KinesisClientLibConfiguration使用DynamoDB提供的流ARN进行配置。

new KinesisClientLibConfiguration("streams-adapter-demo",
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker")

在我的Scala应用程序中,我按照类似的模式首先从我的Dynamo表中定位当前的ARN:

lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn

然后使用提供的 ARN 创建 KinesisClientLibConfiguration

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
  "testProcess",
  streamArn,
  defaultProviderChain,
  "testWorker"
).withMaxRecords(1000)
   .withRegionName("eu-west-1")
   .withMetricsLevel(MetricsLevel.NONE)
  .withIdleTimeBetweenReadsInMillis(500)
  .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

我已验证所提供的流ARN,一切都与我在AWS控制台中看到的相匹配。

但运行时出现异常,指出所提供的ARN不是有效的流名称:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation     
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName'    failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID: )

通过查看提供的KinesisClientLibConfiguration文档,第二个参数被列为streamName而没有提到ARN,这确实是有道理的。

我似乎找不到与ARN相关的KinesisClientLibConfiguration内容。由于我正在使用DynamoDB流而不是Kinesis流,我也不确定如何找到我的流名称。

目前,我不确定我是否漏掉了发布的AWS示例中的任何内容,似乎他们可能在使用较旧版本的KCL。我正在使用amazon-kinesis-client的1.7.0版本。

4个回答

4
问题实际上不在我的KinesisClientLibConfiguration上。
我通过使用相同的配置以及提供包含在DynamoDB流适配器库中的流适配器和DynamoDB和CloudWatch的客户端来解决了这个问题。
现在我的工作方案看起来像这样。
定义Kinesis客户端配置。
//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
    new KinesisClientLibConfiguration(
        getClass.getName, //DynamoDB shard lease table name
        streamArn, //pulled from the dynamo table at runtime
        dynamoCredentials, //DefaultAWSCredentialsProviderChain 
        KeywordTrackingActor.NAME //Lease owner name
    ).withMaxRecords(1000) //using AWS recommended value
     .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

定义一个流适配器和一个CloudWatch客户端。
val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)

创建一个RecordProcessorFactory的实例,你需要定义一个实现KCL提供的IRecordProcessorFactory和返回的IRecordProcessor接口的类。
val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)

而我所缺少的部分是,所有这些都需要提供给您的工人。

val worker :Worker =
  new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(kinesisConfig)
    .kinesisClient(streamAdapterClient)
    .dynamoDBClient(dynamoClient)
    .cloudWatchClient(cloudWatchClient)
    .build()

//this will start record processing
streamExecutorService.submit(worker)

0
只是为了回答问题 - 当它只需要流名称时,您提供了ARN。

0

最近我为这个项目gfc-aws-kinesis提交了一个PR,现在你可以通过传递适配器并编写KinesisRecordAdapter实现来使用它。

在示例中,我使用Scanamo解析哈希表。

创建客户端。

val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
    new AmazonDynamoDBStreamsAdapterClient()

在配置中传递它:

val streamConfig = KinesisStreamConsumerConfig[Option[A]](
  applicationName,
  config.stream, //the full dynamodb stream arn
  regionName = Some(config.region),
  checkPointInterval = config.checkpointInterval,
  initialPositionInStream = config.streamPosition,
  dynamoDBKinesisAdapterClient = Some(streamAdapterClient)
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)

创建一个适用于读取DynamoDB事件的隐式记录读取器:
implicit val kinesisRecordReader
  : KinesisRecordReader[Option[A]] =
  new KinesisRecordReader[Option[A]] {
    override def apply(record: Record): Option[A] = {
      record match {
        case recordAdapter: RecordAdapter =>
          val dynamoRecord: DynamoRecord =
            recordAdapter.getInternalObject
          dynamoRecord.getEventName match {
            case "INSERT" =>
              ScanamoFree
                .read[A](
                  dynamoRecord.getDynamodb.getNewImage)
                .toOption
            case _ => None
          }
        case _ => None
      }
    }
  }

你应该在这里添加一个例子和简短的解释来改进你的回答。也许阅读这篇文章可以帮助你改善你的回答:https://stackoverflow.com/help/how-to-answer。 - Markus

0

或者,您可以使用com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker代替com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker,它在内部使用AmazonDynamoDBStreamsAdapterClient

即:

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
    getClass.getName, //DynamoDB shard lease table name
    streamArn, //pulled from the dynamo table at runtime
    dynamoCredentials, //DefaultAWSCredentialsProviderChain 
    KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
 .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接