一种实现它的方法可能是通过设置属性参数
但我想知道是否有更直接或替代的方法在kafka中同步发送消息,类似于
max.in.flight.requests.per.connection = 1
。但我想知道是否有更直接或替代的方法在kafka中同步发送消息,类似于
producer.syncSend(...)
。max.in.flight.requests.per.connection = 1
。producer.syncSend(...)
。send
返回一个Future
。您可以调用Future#get
来阻塞,直到发送完成。
参见Javadocs的示例:
如果您想模拟简单的阻塞调用,可以立即调用get()方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
producer.sendSync
的请求。 - Thiloacks
生产者参数进行配置。您可以将其设置为0以禁用确认,或将其设置为大于1的值,以确认成功复制到配额。 - ThiloFuture#get
来阻塞,直到发送完成。但是,您可能会遇到一些性能问题,因为当生产者队列有batch.size
个元素时,生产者开始发送,在大小为buffer.memory
的缓冲区满或经过max.block.ms
毫秒后也会发送。max.block.ms
。因此,在某些情况下,您可能更喜欢使用:// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);
当 max.in.flight.requests.per.connection = 1 时,它仅意味着消息的排序在分区内得到保证,与同步无关。
以下是 Python 代码。 对于同步发送,请确保在未来的超时时间内阻塞。
from kafka import KafkaProducer
from kafka.errors import KafkaError
#by default ack = 1, if ack = 'all' --> waits for acks from replicas
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')
key = b'key'
value = b'value'
future = producer.send("my-topic", key=key, value=value)
# block on this future for sync sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
log.exception()
pass
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
producer.flush()
producer.close()
从我和Kafka的冒险中得出的经验:只有当您有一个生产者线程并设置max.in.flight.requests.per.connection
= 1(或关闭retries
,即retries
=0或两者都关闭)时,才能保证消息生产的顺序。
如果您想要扩展到多个生产者,则必须“确保”将存储到同一分区的消息由同一生产者实例生成。
如果您不需要企业级解决方案,请查看此链接: https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1