AMQPNETLITE - ActiveMQ Artemis(Red Hat AMQ)- 自动创建多消费者组播队列

5
这个问题涉及使用AMQP在 .Net 环境下消费消息。官方文档推荐使用 amqpnetlite:https://access.redhat.com/documentation/en-us/red_hat_amq/7.0/html-single/using_the_amq_.net_client/index。 在使用 AMQPNetLite 订阅地址时,地址和队列将被自动创建。但是,自动创建的队列始终是“单播”的。我无法自动创建以下内容:
  1. 可允许任意数量消费者的多播队列。
代码:
private async Task RenewSession()
{
    Connect = await Connection.Factory.CreateAsync(new Address("amqp://admin:admin@localhost:5672"), new Open() {ContainerId = "client-1"});
    MqSession = new Session(Connect);
    var receiver = new ReceiverLink(MqSession, DEFAULT_SUBSCRIPTION_NAME, GetSource("test-topic"), null);
    receiver.Start(100, OnMessage);
}

private Source GetSource(string address)
{
    var source = new Source
    {
        Address = address,
        ExpiryPolicy = new Symbol("never"),
        Durable = 2,
        DefaultOutcome = new Modified
        {
            DeliveryFailed = true,
            UndeliverableHere = false
        }
    };
    return source;
}

也许我漏了一些标志?

已经找到了第一个问题的解决方案 - 创建多播队列是使用源能力:Capabilities = new[] { new Symbol("topic") },//或者使用“queue”来请求队列。 - Praveen Nayak
1个回答

5
在AMQP中,您可以通过设置一个能力来选择自动创建队列(单播路由)或主题(多播路由)。
该能力应为new Symbol("queue")new Symbol("topic")
public class SimpleAmqpTest
{
    [Fact]
    public async Task TestHelloWorld()
    {
        Address address = new Address("amqp://guest:guest@localhost:5672");
        Connection connection = await Connection.Factory.CreateAsync(address);
        Session session = new Session(connection);

        Message message = new Message("Hello AMQP");

        Target target = new Target
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        SenderLink sender = new SenderLink(session, "sender-link", target, null);
        await sender.SendAsync(message);

        Source source = new Source
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null);
        message = await receiver.ReceiveAsync();
        receiver.Accept(message);

        await sender.CloseAsync();
        await receiver.CloseAsync();
        await session.CloseAsync();
        await connection.CloseAsync();
    }
}

请查看 https://github.com/Azure/amqpnetlite/issues/286,这里是代码来源。
您可以通过在broker.xml中设置default-address-routing-type来选择默认路由是多播还是任播,所有内容均记录在https://activemq.apache.org/artemis/docs/2.6.0/address-model.html
AMQP不支持broker的multicastPrefixanycastPrefix功能。 https://issues.jboss.org/browse/ENTMQBR-795

我刚在不久前弄明白了这个细节,并将其作为评论添加到了问题中。你知道问题的第二部分吗?如何创建多播队列以允许最大消费者为-1(任意数量)。注意:地址允许多播队列,但一个多播队列上的最大消费者数被设置为1。 - Praveen Nayak
@PraveenNayak 不是很确定。据我所知,你无法自动创建它,虽然可以通过broker.xml或管理设置来完成。我的问题是:为什么你想要这样设置呢? - user7610
需要使用代理程序进行发布-订阅。发布者向多播地址发布“OrderPlaced”。两个订阅者“InventoryMgmt”和“Billing”在每个队列中订阅它。我们希望通过添加3个“Billing”消费者实例来分配“Billing”工作负载,而“InventoryMgmt”只有1个消费者。在3个队列中添加3个“Billing”消费者将导致所有3个消费者都获得每个消息的副本。所有3个消费者从“Billing”队列中工作将确保工作负载在三个之间分配,可能是轮流的。我希望这有意义。 - Praveen Nayak
1
@PraveenNayak 我想我明白了。根据 JMS 的术语,这听起来很像“共享订阅”功能,http://jmesnil.net/weblog/2013/06/27/jms-20-shared-subscription/。您可以通过设置“shared”(可能还要设置“global”)能力来实现此目的。此外,设置接收器选项中的订阅名称,例如`opts.name("sub-1"); // A stable link name` (根据 https://issues.jboss.org/browse/ENTMQCL-580 中第一个评论中链接的示例,这将在 qpid-proton-cpp 中实现)。消息将分发给所有设置相同名称的接收器。 - user7610
设置“共享”功能对我有用,感谢帮助。 - Praveen Nayak
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接