Mac m1上的Kafka Streams groupByKey无法工作,原因是RocksDB。

4
当我尝试在函数中使用kafka streams的"groupByKey"功能时,出现了以下rocksdb错误,而简单的消费者函数却可以正常工作。
环境: confluent v1.30.0 (运行在单节点开发环境中的confluent)
操作系统: Apple m1 mac - Big Sur v11.5.1 with rosetta
Java版本: openjdk 11.0.12 2021-07-20 LTS
错误跟踪:
Exception in thread "streamApp-c9d33475-adca-4567-8ec1-8db1fe4bc4a9-StreamThread-1" java.lang.UnsatisfiedLinkError: /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: dlopen(/private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib, 1): no suitable image found.  Did find:
    /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
    /private/var/folders/2g/x44n4hwx19v_m7l09fq_r6k80000gn/T/librocksdbjni2764289432609839437.jnilib: mach-o, but wrong architecture
    at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
    at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2442)
    at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2498)
    at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2694)
    at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2627)
    at java.base/java.lang.Runtime.load0(Runtime.java:768)
    at java.base/java.lang.System.load(System.java:1837)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.DBOptions.<clinit>(DBOptions.java:21)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:253)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:54)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:74)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:120)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:120)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
    at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:210)
    at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:473)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523)

有问题的代码

@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
    return kstream -> kstream
            .peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
            .groupByKey()
            .reduce((s, v1) -> s + ", " + v1)
            .toStream()
            .flatMap((ignoredWindowedKey, value) -> getChanges(value));
}

正常工作的代码:

@Bean
public Function<KStream<String, String>, KStream<String, String>> streamApp() {
    return kstream -> kstream
            .peek((key, value) -> logger.info("streamApp triggered with key {}, value {}", key, value))
            .flatMap((ignoredWindowedKey, value) -> getChanges(value));
}
1个回答

4

目前,rocksDB还不支持Apple Silicon。但是正在开发针对Apple Silicon的版本,截至2021年8月7日尚未完成:https://github.com/facebook/rocksdb/issues/7720。在这个GitHub问题中,您可以找到一个链接,也包括在此处,指向一个可用于在Apple Silicon上构建rocksDB的工作版本的代码:https://github.com/adamretter/rocksdb/tree/macos-multi-arch

您还可以使用x86 JDK和Rosetta仿真来运行您的代码,在那里rocksDB应该能够正常运行。这个StackOverflow回答可能会帮助您:https://dev59.com/mFEG5IYBdhLWcg3wY8T1#66779308


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接