等待一个RabbitMQ消息并设置超时时间

12
我想要向RabbitMQ服务器发送一条消息,并等待回复消息(在“reply-to”队列上)。当然,我不想永远等待,以防应用程序处理这些消息的进程关闭 - 需要设置超时。这听起来像是一个非常基本的任务,但我找不到实现的方法。我现在使用py-amqplibRabbitMQ .NET client都遇到了这个问题。
到目前为止,我得到的最佳解决方案是使用basic_get进行轮询,并在两次轮询之间使用sleep,但这很丑陋:
def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

肯定有更好的方式吧?

5个回答

10

这是我在 .NET 客户端最终采取的做法:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}

很遗憾,我无法使用py-amqplib来实现相同的功能,因为它的basic_consume方法不会调用回调,除非您调用了channel.wait(),而channel.wait()不支持超时!这个愚蠢的限制(我一直在遇到)意味着,如果您永远不再收到另一条消息,您的线程将永远被冻结。


9

1
现在这才是我所谓的“好答案”:“问题已经解决了”!希望它能够被合并到amqplib中,因此我接受了。 - EMP
@EMP 哈哈 :) 有趣 :) - Alexander.Iljushkin

2
这句话的意思是:“这里有一个例子here使用qpid和一个msg = q.get(timeout=1),应该能满足您的需求。抱歉,我不知道其他AMQP客户端库是否实现了超时(特别是我不知道您提到的两个具体库)。”

从 qpid 的源代码来看,它似乎使用与 .NET 客户端完全相同的方法:使用队列的 basic_consume 并等待队列超时。看起来这就是我要做的事情。 - EMP

1
现在Rabbit支持添加超时事件。只需将代码放入try catch中,然后在TimeOut和Disconnect处理程序中抛出异常即可:
try{
    using (IModel channel = rabbitConnection.connection.CreateModel())
    {
        client = new SimpleRpcClient(channel, "", "", queue);
        client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
        client.TimedOut += RpcTimedOutHandler;
        client.Disconnected += RpcDisconnectedHandler;
        byte[] replyMessageBytes = client.Call(message);
        return replyMessageBytes;
    }
}
catch (Exception){
    //Handle timeout and disconnect here
}
private void RpcDisconnectedHandler(object sender, EventArgs e)
{
     throw new Exception("RPC disconnect exception occured.");
}

private void RpcTimedOutHandler(object sender, EventArgs e)
{
     throw new Exception("RPC timeout exception occured.");
}

1

这似乎破坏了异步处理的整个概念,但如果必须这样做,我认为正确的方法是使用RpcClient


虽然 RpcClient 本身对我来说没有用处,但是查看它的实现揭示了使用的方法:创建一个 QueueingBasicConsumer 并等待其队列,该队列支持超时。在 .NET 中,这并不像我担心的那样复杂。 - EMP

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接