Kafka消息编解码 - 压缩和解压缩

10
使用 kafka 时,我可以通过设置 kafka 生产者的 kafka.compression.codec 属性来设置编解码器。
假设我在生产者中使用 snappy 压缩,在使用某个 kafka-consumer 消费 kafka 消息时,我是否需要做一些事情来解码 snappy 数据,或者这是 kafka consumer 的内置功能?
相关文档 中,我找不到任何与 kafka consumer 编码有关的属性(它只与 producer 相关)。
能有人澄清一下吗?
2个回答

17
据我所知,消费者本身负责解压缩。正如官方维基页面中提到的那样:消费迭代器会透明地解压缩压缩数据,并只返回未压缩的消息
根据这篇文章所述,消费者的工作方式如下:
消费者具有后台的“抓取器”线程,连续从经纪人批量获取1MB的数据,并将其添加到内部阻塞队列中。消费者线程从此阻塞队列中出队数据,进行解压缩并遍历消息。
此外,在端到端批处理压缩文档页面中也提到:
一批消息可以被聚合在一起压缩并以这种形式发送到服务器。这批消息将以压缩形式写入并保持在日志中,并且只由消费者解压缩。
因此,似乎解压缩部分是由消费者本身处理的,您所需做的就是在创建生产者时使用compression.codec ProducerConfig属性提供有效/支持的压缩类型。我找不到任何示例或说明,说明在消费者端有任何解压缩方法。如果我理解有误,请指正。

0

我也遇到了v0.8.1版本的同样问题,Kafka中的压缩解压文档非常不好,除了说消费者应该“透明地”解压缩压缩数据之外,它从来没有做到过。

Kafka网站上使用ConsumerIterator的高级消费者客户端示例仅适用于未压缩的数据。一旦我在生产者客户端启用压缩,消息就永远无法进入以下的“while”循环。希望他们能尽快解决这个问题,或者他们不应该声称这个功能,因为有些用户可能会使用Kafka来传输需要批处理和压缩功能的大型消息。

ConsumerIterator <byte[], byte[]> it = stream.iterator();
while(it.hasNext())
{
   String message = new String(it.next().message());
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接