Kafka Node.js 客户端使用 Snappy 压缩时出现问题

3
我正在使用kafka-node (https://github.com/SOHU-Co/kafka-node) 消费者获取数据。我认为我得到的数据是使用SNAPPY压缩的。我如何在获取数据后对其进行解压缩。我尝试使用node-snappy (https://github.com/kesla/node-snappy) 来解压缩数据,但它没有起作用。
这个库中是否有设置压缩为无的选项?
有人使用过kafka-node库从kafka获取数据吗?
谢谢, chandu
3个回答

2

我也碰到了这些问题。最终我找到了解决方案!你可以使用kafkacat(类似于kafka的netcat)在此下载,它需要librdkafka。这使得你可以使用librdkafka C/C++库从命令行与Kafka交互。(不需要JVM=D)

现在我们已经处理了这些依赖项,我们可以使用这个有趣的node.js仓库:node-kafkacat

你会发现,在这三个库之间可能已经有足够的文档让你开始了。与Github上的一些其他kafka-node模块不同,它们似乎最近更新过。

到目前为止,我只在Linux和Mac上成功安装,但是我们的Apache/Java环境运作良好。顺便说一下,我不是这些包的作者 - 只是一个希望在过去几周里你的问题能够被回答的人。


1
默认情况下,通过配置属性 'compression.type' 生产者确定压缩方式。目前支持 gzip、snappy 和 lz4。kafka-node 应该会自动解压缩 gzip 和 snappy。我刚试了一下 snappy,这个功能可以直接使用。lz4 看起来在 kafka-node 中还没有实现。
可以使用配置属性 'compression.type' 来配置压缩方式:
- 每个 broker:https://kafka.apache.org/documentation/#brokerconfigs。默认值:生产者确定压缩方式。 - 每个主题:https://kafka.apache.org/documentation/#brokerconfigs(向下滚动)。默认值:生产者确定压缩方式。 - 每个生产者:https://kafka.apache.org/documentation/#producerconfigs。默认值:无。
如果您无法控制生产者,则可以考虑覆盖主题配置。
此外,注意kafka-node可能不会像您期望的那样直接返回数据。例如,我的(key,value)对是(integer,integer)类型的,我必须调用message.key.readIntBE()和message.value.readIntBE()来提取整数值。
希望这可以帮助您!

0

我在LZ4压缩方面遇到了类似的问题,似乎kafkajs和kafka-node只支持gzip,而对于snappy和LZ4,您必须安装另一个软件包。对于LZ4软件包,kafkajs-lz4对我来说不起作用,因此我不得不实现自定义解码器。简单地下载lz4而不是kafkajs-lz4对我有用。

所以最终我决定使用kafkajs而不是kafka-node。

const LZ4 = require("lz4");
const LZ4Codec = {
    async compress(encoder) {
        // TODO:
        return LZ4.encode(encoder);
    },

    async decompress(buffer) {
        return LZ4.decode(buffer);
    }
}

CompressionCodecs[CompressionTypes.LZ4] = () => LZ4Codec;

await this.consumer.run({
  eachMessage: async (res) => {
     if(res && res.topic && res.message) {
        var message = res.message;
        var key = message.key;
        var value = message.value;
     }              
  }
})

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接