如果您在应用程序中配置了多个消费者,则必须使用唯一的密钥来区分它们。
以下示例考虑了多个消费者配置。
消费者配置示例:
consumers:
"fun-consumer-key":
topic: fun.topic-1
consumerProcessOnStart: org.kafka.ProcessConsumedMessage
consumerProperties:
"[bootstrap.servers]": localhost:9092,localhost:9093
// Other consumer configs
消费者工厂监听器:
@Component
@Slf4j
public class ConsumerFactoryListener<K, V> implements Listener<K, V> {
@Override
public void consumerAdded(final String id, final Consumer<K, V> consumer) {
}
@Override
public void consumerRemoved(final String id, final Consumer<K, V> consumer) {
}
}
AppPropertiesConfig用于保存Consumer对象:
@UtilityClass
public class AppPropertiesConfig {
private static Map<String, Object> configConsumers = new ConcurrentHashMap<>();
public static Map<String, Object> getConfigConsumers() {
return consumerMap;
}
}
消息监听器:Ack和自动Ack(我正在添加自动确认和确认消息监听器,但您需要分别使用不同的类来实现它们。)
public class AutoAckMessageListener<K, V> extends
implements MessageListener<K, V> {
private final ProcessConsumedMessage<K, V> consumeMessage;
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord) {
onMessageConsume(consumerRecord, consumeMessage);
}
}
public class AckMessageListener<K, V> extends
implements AcknowledgingMessageListener<K, V> {
private final ProcessConsumedMessage<K, V> consumeMessage;
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, final Acknowledgment acknowledgment) {
onMessageConsume(consumerRecord, consumeMessage);
acknowledgment.acknowledge();
}
}
public void onMessageConsume(ConsumerRecord<K, V> consumerRecord,
final ProcessConsumedMessage<K, V> consumeMessage) throws InterruptedException {
consumeMessage.process(consumerRecord.key(), consumerRecord.value());
}
初始化消息监听器:
public MessageListener getListener(String className) {
final ProcessConsumedMessage consumeMessage = (ProcessConsumedMessage) getClassFromName();
MessageListener listener;
if (isAutoCommitEnabled) {
listener = new AutoAckMessageListener(consumeMessage);
} else {
listener = new AckMessageListener(consumeMessage);
}
return listener;
}
开始一个消费者:
public void startConsumer(final String key, final String topic,
final Object messageListener, final Map<String, Object> consumerProperties) {
ConcurrentMessageListenerContainer<K, V> container =
(ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
.getConfigConsumers().get(key);
if (container != null) {
if (!container.isRunning()) {
container.start();
}
return;
}
final DefaultKafkaConsumerFactory<K, V> factory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
factory.addListener(consumerFactoryListener);
final ConcurrentKafkaListenerContainerFactory<K, V> containerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(factory);
final ContainerProperties containerProperties = containerFactory.getContainerProperties();
containerProperties.setPollTimeout(pollTimeout);
if (!isAutoCommitEnabled) {
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
}
containerFactory.setErrorHandler(getErrorHandler(<some retry configurations object>));
container = containerFactory.createContainer(topic);
container.setupMessageListener(messageListener);
container.start();
AppPropertiesConfig.getConfigConsumers().put(key, container);
}
private SeekToCurrentErrorHandler getErrorHandler() {
}
停止一个消费者:
public void stopConsumer(final String key) {
if (StringUtils.isBlank(key)) {
return;
}
final ConcurrentMessageListenerContainer<K, V> container
= (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
.getConfigConsumers().get(key);
if (container == null || !container.isRunning()) {
throw new Exception();
}
try {
container.stop();
} catch (Exception e) {
throw e;
} finally {
AppPropertiesConfig.getConfigConsumers().remove(key);
}
}