我试图在Kafka Streams中进行简单的外键连接,类似于许多文章(例如这里:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/)。当我尝试将用户表的主键
任何指导都将有所帮助。我没有包含自定义serde代码或对象形状,但它们非常简单。如果您需要其他澄清,请告诉我。
谢谢。
id
与 account_balance
表中的外键 user_id
进行连接以生成 AccountRecord
对象时,我收到以下错误消息:
[-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.
最终目标是在任何表字段更新时将 AccountRecord
交付给特定的主题。问题是,当我分别打印用户表和账户表时,外键和所有字段都被完全填充。我不知道出了什么问题或者为什么会出现此错误。这是我的代码片段: public void start_test(){
StreamsBuilder builder = new StreamsBuilder();
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
);
// print the table
accountRecordTable
.toStream()
.print(Printed.toSysOut());
KafkaStreams stream = new KafkaStreams(builder.build(), properties);
stream.start();
}
任何指导都将有所帮助。我没有包含自定义serde代码或对象形状,但它们非常简单。如果您需要其他澄清,请告诉我。
谢谢。
snake_case
,与反序列化器相同。 - user28938