有没有一种方法可以在Spring应用程序中手动启动/停止Kafka消费者?

3
目前,我有一个使用Spring Boot CLI的应用程序,它会在应用程序启动时自动启动Kafka消费者。现在我需要更新这个应用程序,提供一个API以便于根据特定条件启动或停止Kafka消费者。因此,我将使用Spring Boot Starter WEB来创建该API。但是我无法找到手动管理消费进程的方法。我需要:
  • 在没有工作消费者的情况下启动API
  • 通过特定的API调用运行消费
  • 通过特定的API调用停止消费
您有任何建议如何手动管理消费进程吗?
技术细节:
  • 使用@KafkaListener创建监听器
  • 使用ConcurrentKafkaListenerContainerFactory作为KafkaListenerContainerFactory

你是如何创建一个Kafka消费者的?使用@KafkaListener吗? - undefined
看一下这个,这是你在找的吗? https://dev59.com/9Lbna4cB1Zd3GeqPhtez - undefined
@GovindaSakhare 是的,我使用它。 - undefined
这句话的意思是“启动API,但没有工作的消费者”。 - undefined
1个回答

1
如果您在应用程序中配置了多个消费者,则必须使用唯一的密钥来区分它们。
以下示例考虑了多个消费者配置。
消费者配置示例:
consumers:
      "fun-consumer-key":
        topic:  fun.topic-1
        consumerProcessOnStart: org.kafka.ProcessConsumedMessage
        consumerProperties:
          "[bootstrap.servers]": localhost:9092,localhost:9093
          // Other consumer configs

消费者工厂监听器:

@Component
@Slf4j
public class ConsumerFactoryListener<K, V> implements Listener<K, V> {

  @Override
  public void consumerAdded(final String id, final Consumer<K, V> consumer) {
    //
  }

  @Override
  public void consumerRemoved(final String id, final Consumer<K, V> consumer) {
    //
  }

}

AppPropertiesConfig用于保存Consumer对象:

@UtilityClass
public class AppPropertiesConfig {

  private static Map<String, Object> configConsumers = new ConcurrentHashMap<>();

  public static Map<String, Object> getConfigConsumers() {
    return consumerMap;
  }
}

消息监听器:Ack和自动Ack(我正在添加自动确认和确认消息监听器,但您需要分别使用不同的类来实现它们。)

    public class AutoAckMessageListener<K, V> extends
        implements MessageListener<K, V> {
    
      private final ProcessConsumedMessage<K, V> consumeMessage;
    
      
      @Override
      public void onMessage(ConsumerRecord<K, V> consumerRecord) {
        onMessageConsume(consumerRecord, consumeMessage);
      }
    }

    public class AckMessageListener<K, V> extends
        implements AcknowledgingMessageListener<K, V> {
    
      private final ProcessConsumedMessage<K, V> consumeMessage;
    
      
      @Override
      public void onMessage(ConsumerRecord<K, V> consumerRecord, final Acknowledgment acknowledgment) {
        onMessageConsume(consumerRecord, consumeMessage);
        acknowledgment.acknowledge();
      }
    }

  // You can put this method in an abstract class and both listener classes can extend this Abstract class with onMessageConsume method
  public void onMessageConsume(ConsumerRecord<K, V> consumerRecord,
      final ProcessConsumedMessage<K, V> consumeMessage) throws InterruptedException {
    
    // Your custom processing implementation
    consumeMessage.process(consumerRecord.key(), consumerRecord.value());
  }

初始化消息监听器:

  public MessageListener getListener(String className) {
    final ProcessConsumedMessage consumeMessage = (ProcessConsumedMessage) getClassFromName();
    MessageListener listener;
    if (isAutoCommitEnabled) {
      listener = new AutoAckMessageListener(consumeMessage);
    } else {
      listener = new AckMessageListener(consumeMessage);
    }
    return listener;
  }

开始一个消费者:

public void startConsumer(final String key, final String topic,
          final Object messageListener, final Map<String, Object> consumerProperties) {
        
        
        // Check already created, start and return
        ConcurrentMessageListenerContainer<K, V> container =
            (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
            .getConfigConsumers().get(key);  // key - "fun-consumer-key"
        if (container != null) {
          if (!container.isRunning()) {
           
            container.start();
          }
          return;
        }
        
        final DefaultKafkaConsumerFactory<K, V> factory = new DefaultKafkaConsumerFactory<>(
            consumerProperties);
        factory.addListener(consumerFactoryListener);
    
        final ConcurrentKafkaListenerContainerFactory<K, V> containerFactory
            = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(factory);
    
        final ContainerProperties containerProperties = containerFactory.getContainerProperties();
        containerProperties.setPollTimeout(pollTimeout);
    
        // auto-commit??
        if (!isAutoCommitEnabled) {
          containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
        }

        containerFactory.setErrorHandler(getErrorHandler(<some retry configurations object>));
        // create the container
        container = containerFactory.createContainer(topic);
        container.setupMessageListener(messageListener);
        // start
        container.start();
        AppPropertiesConfig.getConfigConsumers().put(key, container);
      }
    
      private SeekToCurrentErrorHandler getErrorHandler() {
          // Provide your error handler. Ex: SeekToCurrentErrorHandler
      }

停止一个消费者:

  public void stopConsumer(final String key) {
    if (StringUtils.isBlank(key)) {
      return;
    }
    final ConcurrentMessageListenerContainer<K, V> container
        = (ConcurrentMessageListenerContainer<K, V>) AppPropertiesConfig
        .getConfigConsumers().get(key);
    if (container == null || !container.isRunning()) {
      throw new Exception();
    }
    try {
      container.stop();
    } catch (Exception e) {
      // log
      throw e;
    } finally {
      AppPropertiesConfig.getConfigConsumers().remove(key);
    }
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接