Kafka Streams KTable外键连接未按预期工作。

3
我试图在Kafka Streams中进行简单的外键连接,类似于许多文章(例如这里:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/)。当我尝试将用户表的主键 idaccount_balance 表中的外键 user_id 进行连接以生成 AccountRecord 对象时,我收到以下错误消息: [-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key. 最终目标是在任何表字段更新时将 AccountRecord 交付给特定的主题。问题是,当我分别打印用户表和账户表时,外键和所有字段都被完全填充。我不知道出了什么问题或者为什么会出现此错误。这是我的代码片段:
    public void start_test(){
        StreamsBuilder builder = new StreamsBuilder();

        KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
        KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));

        final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
                userTable,
                AccountBalance::getUserId,
                (account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
        );

        // print the table
        accountRecordTable
                .toStream()
                .print(Printed.toSysOut());

        KafkaStreams stream = new KafkaStreams(builder.build(), properties);
        stream.start();
    }

任何指导都将有所帮助。我没有包含自定义serde代码或对象形状,但它们非常简单。如果您需要其他澄清,请告诉我。
谢谢。
1个回答

2

您的消息中是否包含关键记录?KTable是一个更改日志流的抽象,其中每个数据记录表示更新。要了解该更新的方式是使用关键字,因此在处理KTables时,记录的关键字非常重要。

例如:
AccountBalance<Key=10,Value={accountBalanceId=10,userId=777,balance=10}>
User<Key=777, Value={firstName="Panchito"}>

另一个观察点是您的 Serde 键,如果您将 Long 定义为键,为什么要使用自定义 serde?
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));

KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))

也许您的密钥反序列化程序将密钥发送为null。检查自定义Serde日志的输出。 此外,您需要改进join方法,添加一个materialized,因为您正在创建一个新对象,而Kafka不知道如何处理新对象。
      final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
                    userTable,
                    AccountBalance::getUserId,
                    (account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
            );

尝试使用JsonSerde或Avro来创建自定义Serdes,以替代当前方法。


感谢您的回复,我已将您的答案标记为正确。您说得对,问题出在serde上。实际上,账户记录的Json序列化器自定义serde是有问题的。我必须将其配置为处理snake_case,与反序列化器相同。 - user28938

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接