Spring Kafka集成:动态创建消费者

3

我是使用Spring-Integration-Kafka的,以下是动态创建消费者以接收和在控制台打印消息的示例。 消费者类:

public class Consumer1 {
private static final String CONFIG = "kafkaInboundMDCAdapterParserTests-context.xml";
static ClassPathXmlApplicationContext ctx;

public static void main(final String args[]) {
    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default8");

    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default10");

}

public static void addConsumer(String topicId, String groupId) {

    MessageChannel inputChannel = ctx.getBean("inputFromKafka", MessageChannel.class);

    ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new MessageReceiver(), "processMessage");
    ((SubscribableChannel) inputChannel).subscribe(serviceActivator);

    KafkaConsumerContext<String, String> kafkaConsumerContext = ctx.getBean("consumerContext", KafkaConsumerContext.class);
    try {
        TopicFilterConfiguration topicFilterConfiguration = new TopicFilterConfiguration(topicId, 1, false);

        ConsumerMetadata<String,String> consumerMetadata = new ConsumerMetadata<String, String>();
        consumerMetadata.setGroupId(groupId);
        consumerMetadata.setTopicFilterConfiguration(topicFilterConfiguration);
        consumerMetadata.setConsumerTimeout("1000");
        consumerMetadata.setKeyDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
        consumerMetadata.setValueDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));


        ZookeeperConnect zkConnect = ctx.getBean("zookeeperConnect", ZookeeperConnect.class);

        ConsumerConfigFactoryBean<String, String> consumer = new ConsumerConfigFactoryBean<String, String>(consumerMetadata,
                zkConnect);

        ConsumerConnectionProvider consumerConnectionProvider = new ConsumerConnectionProvider(consumer.getObject());
        MessageLeftOverTracker<String,String> messageLeftOverTracker = new MessageLeftOverTracker<String, String>();
        ConsumerConfiguration<String, String> consumerConfiguration = new ConsumerConfiguration<String, String>(consumerMetadata, consumerConnectionProvider, messageLeftOverTracker);

        kafkaConsumerContext.getConsumerConfigurations().put(groupId, consumerConfiguration);
    } catch (Exception exp) {
        exp.printStackTrace();
    }
}

}

入站配置文件:

<int:channel id="inputFromKafka"/>

<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181"
        zk-connection-timeout="6000"
        zk-session-timeout="6000"
        zk-sync-time="2000"/>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
        kafka-consumer-context-ref="consumerContext"
        auto-startup="false"
        channel="inputFromKafka">
    <int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>

<bean id="kafkaReflectionDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
    <constructor-arg type="java.lang.Class" value="java.lang.String"/>
</bean>

<int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000"
        zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration group-id="default1"
                value-decoder="kafkaReflectionDecoder"
                key-decoder="kafkaReflectionDecoder"
                max-messages="5000">
            <int-kafka:topic id="mdc1" streams="1"/>
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

当我向主题“test19”发送任何消息时,配置的ServiceActivator“processMessage”方法会显示两个消息,因为配置了两个客户端,但问题在于,在将其添加到消费者上下文之前,我需要为每个客户端加载入站配置文件。否则,我只能在控制台中获取一条消息。这样做正确吗?还是我需要在这里进行任何更改?
谢谢。
1个回答

0

你尝试做什么并不清楚,但已有的内容存在问题。

在订阅消费者之前启动上下文可能会出现问题(在启动和订阅之间的短时间内, inputFromKafka 上的调度程序没有订阅者)。

为什么要以编程方式创建服务激活器而不是在上下文中声明它?

最好在上下文中配置所有内容(可以通过环境中的属性传递诸如 groupId 之类的属性到上下文中,并使用属性占位符配置程序。)


你好Gary,根据我的需求,我正在尝试减少上下文中的配置数量,我只想知道如何动态创建消费者并将服务激活器消息分发给所有消费者。 - user1606656

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接