使用Pulsar消息监听器进行多线程处理。

3

我对Java消息监听器Apache Pulsar都比较新手。假设我已经像这样维护了一个监听器:

private MessageListener<byte[]> generateListener() {
        MessageListener<byte[]> listener = (consumer, respMsg) -> {
            String respM = new String(respMsg.getValue(), StandardCharsets.UTF_8);
            logger.info(respM);
            consumer.acknowledgeAsync(respMsg);
        };
        return listener;
    }

一个类似这样的消费者实例:

Consumer<byte[]> c = consumerBuilder.messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync().get();

我想知道的是,这个监听器如何处理多个输入消息?每个消息都会像JMS监听器一样在单独的线程中处理吗?如果是这样,那么如何配置要使用的线程数 - 是通过使用ClientBuilder.listenerThreads()属性吗?
在维护多个消费者时,是否需要针对每个消费者维护多个监听器对象,例如: consumerBuilder.clone().messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync()

我之前在 Stack 上问了一个类似的问题 - 在这里查看:https://stackoverflow.com/questions/67171146/apache-pulsar-async-consumer-setup-completable-future - user521990
1个回答

1

ClientBuilder#listenerThreads方法允许配置一个内部线程池的大小,该线程池将在从该客户端创建的所有ConsumersReaders之间共享并使用。

Pulsar Client将为单个消费者的MessageListener提供保证,即由同一线程调用提供的MessageListener,因此不需要编写线程安全代码。因此,最好为每个Consumer使用一个专用的MessageListener对象。

请注意,这也确保了顺序。

因此,如果您只使用单个Consumer,则可以将listenerThreads保留为默认值1

以下是一个完整的示例,可用于观察行为:

public class PulsarConsumerListenerExample {

    public static void main(String[] args) throws PulsarClientException {

        int numListenerThread = 2;

        PulsarClient client = PulsarClient
                .builder()
                .serviceUrl("pulsar://localhost:6650")
                .listenerThreads(numListenerThread)
                .build();

        final List<Consumer<?>> consumers = new ArrayList<>();
        for (int i = 0; i < numListenerThread; i++) {
            consumers.add(createConsumerWithLister(client, "my-topic", "my-subscription", "C" + i));
        }

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            for (Consumer<?> consumer : consumers) {
                try {
                    consumer.close();
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
            }
        }));
    }

    private static Consumer<String> createConsumerWithLister(final PulsarClient client,
                                                             final String topic,
                                                             final String subscription,
                                                             final String consumerName) throws PulsarClientException {
        return client.newConsumer(Schema.STRING)
            .topic(topic)
            .consumerName(consumerName)
            .subscriptionName(subscription)
            .subscriptionMode(SubscriptionMode.Durable)
            .subscriptionType(SubscriptionType.Failover)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .messageListener((MessageListener<String>) (consumer, msg) -> {
                System.out.printf(
                    "[%s/%s]Message received: key=%s, value=%s, topic=%s, id=%s%n",
                    consumerName,
                    Thread.currentThread().getName(),
                    msg.getKey(),
                    msg.getValue(),
                    msg.getTopicName(),
                    msg.getMessageId().toString());
                consumer.acknowledgeAsync(msg);
            })
            .subscribe();
    }
}

感谢您的回复。在这种情况下,为了以良好的吞吐量消耗大量消息,是否最好在同一主题上维护多个具有共享订阅的消费者?但是,创建多个消费者会产生不必要的开销吗?因此,没有异步使用多个线程消耗消息的选项吗? - Ivak
关于拥有多个消费者的事实,也许这个问题可以帮助您:https://stackoverflow.com/questions/64603078/what-is-the-cost-of-having-multiple-subscriptions-in-pulsar。订阅类型取决于您的需求。如果您不需要强制顺序,则可以使用具有多个消费者的Shared或KeyShared订阅。 - fhussonnois

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接