如何在kafka中同步发送消息?

9
一种实现它的方法可能是通过设置属性参数 max.in.flight.requests.per.connection = 1
但我想知道是否有更直接或替代的方法在kafka中同步发送消息,类似于 producer.syncSend(...)

你能解释一下为什么你想要那个吗?如果这与消息排序保证有关,那可能会更加复杂(同步发送并不真正改变那里的事情)。 - Thilo
同一生产者在同一主题的同一分区发送的消息将保留该顺序。在多个分区、主题或生产者之间没有排序保证。如果需要排序(例如通过查看消息时间戳),则必须在应用程序代码中在消费者端排列消息。 - Thilo
@Thilo,这让我想到另一个问题。批次是按分区、主题还是生产者来划分的?或者是这些的某种组合? - user8339674
6个回答

14
生产者API从send返回一个Future。您可以调用Future#get来阻塞,直到发送完成。

参见Javadocs的示例:

如果您想模拟简单的阻塞调用,可以立即调用get()方法:

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = 
     new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

3
按照这个答案建议调用get()方法只会告诉你消息已完全发送,无法提供模拟同步调用的解决方案。涉及kafka的真正解决方案将涉及消费某种完成消息的完成消费者,从而释放被阻塞的请求线程。 - Rodney P. Barbati
3
如果你所说的“同步调用”是指你想等待所有感兴趣的消费者完成你希望他们根据该消息执行的任何操作,那么是的,答案确实没有提供“同步RPC机制”。但它确实解决了原帖作者对producer.sendSync的请求。 - Thilo
@Thilo 我认为我们需要在 get() 方法之后立即刷新消息,以确保其传递到代理。如果我错了,请纠正我。 - Ankit Singodia
1
@AnkitSingodia 当等待生产者返回的Future完成时,阻塞将(默认情况下)包括来自代理的交付确认。可以通过acks生产者参数进行配置。您可以将其设置为0以禁用确认,或将其设置为大于1的值,以确认成功复制到配额。 - Thilo

6
如Thilo所建议的那样,您可以调用Future#get来阻塞,直到发送完成。但是,您可能会遇到一些性能问题,因为当生产者队列有batch.size个元素时,生产者开始发送,在大小为buffer.memory的缓冲区满或经过max.block.ms毫秒后也会发送。
如果您只有有限数量的线程推送到kafka,则每次等待消息发送需要等待max.block.ms。因此,在某些情况下,您可能更喜欢使用:
// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);

1
Thilo提出的答案是可行的。一般来说,您关于使用max.in.flight.requests.per.connection = 1的建议用于仍启用重试但不丢失消息排序。它并不适用于具有同步生产者。

1

当 max.in.flight.requests.per.connection = 1 时,它仅意味着消息的排序在分区内得到保证,与同步无关。

以下是 Python 代码。 对于同步发送,请确保在未来的超时时间内阻塞。

from kafka import KafkaProducer
from kafka.errors import KafkaError

#by default ack = 1, if ack = 'all' --> waits for acks from replicas 
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')


key = b'key'
value = b'value'

future = producer.send("my-topic", key=key, value=value)

# block on this future for sync sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    log.exception()
    pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

producer.flush()
producer.close()

0

从我和Kafka的冒险中得出的经验:只有当您有一个生产者线程并设置max.in.flight.requests.per.connection = 1(或关闭retries,即retries=0或两者都关闭)时,才能保证消息生产的顺序。

如果您想要扩展到多个生产者,则必须“确保”将存储到同一分区的消息由同一生产者实例生成。


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接