C# RabbitMQ等待特定超时时间内的一个消息?

12

由于官方C#库中没有下一个传递方法,而QueueingBasicConsumer已过时且会在任何地方都抛出NotSupportedException,因此在RabbitMQ等待带有超时的消息等待具有超时的单个RabbitMQ消息中找到的解决方法似乎不起作用。

那么我如何等待队列中单个消息达到指定的超时时间呢?

PS

可以通过Basic.Get()实现,但是它拉取指定间隔内的消息不是很好的解决方案(会产生过多的流量和CPU占用)。

更新

EventingBasicConsumer通过实现不支持立即取消。 即使在某些点上调用BasicCancel,即使您通过BasicQos指定了预取,在Frames中仍将获取它们,这些Frames可能包含多个消息。 因此,它对于单个任务执行不好。别费力 - 它不能与单个消息一起使用。

1个回答

10

有许多方法可以做到这一点。例如,您可以使用EventingBasicConsumerManualResetEvent一起使用,像这样(这仅用于演示目的 - 最好使用以下方法之一):

var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
    using (var channel = connection.CreateModel()) {
        // setup signal
        using (var signal = new ManualResetEvent(false)) {
            var consumer = new EventingBasicConsumer(channel);
            byte[] messageBody = null;                        
            consumer.Received += (sender, args) => {
                messageBody = args.Body;
                // process your message or store for later
                // set signal
                signal.Set();
            };               
            // start consuming
            channel.BasicConsume("your.queue", false, consumer);
            // wait until message is received or timeout reached
            bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
            // cancel subscription
            channel.BasicCancel(consumer.ConsumerTag);
            if (timeout) {
                // timeout reached - do what you need in this case
                throw new Exception("timeout");
            }

            // at this point messageBody is received
        }
    }
}

正如您在评论中所述 - 如果您希望在同一队列上接收多个消息,那么这并不是最佳方式。事实上,在任何情况下都不是最佳方式,我只是为了演示在库本身不提供超时支持的情况下如何使用ManualResetEvent

如果您正在进行RPC(远程过程调用,请求-响应) - 您可以在服务器端使用SimpleRpcServerSimpleRpcClient。客户端将如下所示:

var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
    // do something on timeout
};                    
var reply = client.Call(myMessage); // will return reply or null if timeout reached

更加简单的方法:使用基本的Subscription类(它内部使用相同的EventingBasicConsumer,但支持超时,因此您无需自己实现),像这样:

var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
     // timeout
}

1
第一个解决方案无效。BasicConsume不能保证在BasicCancel时停止消费,这可能是由于Rabbit的即时实现稍后才能完成(只需尝试每个请求消耗一条消息,您将看到在某些情况下会分配几次messageBody)。您仍需要重新排队多余的消息。对于第二和第三个解决方案,我现在会尝试一下 =) - eocron
1
尽管你的前半部分与此无关,但Subscription类正是我想要的!谢谢,它运行得非常好!你能编辑一下答案,让其他人知道最后一个解决方案可行吗? - eocron
然而,它通过实现存储了一堆消息,而我只需要一个=/ - eocron
你为什么需要设置超时时间来获取数据?是因为发送请求后需要等待回复,还是出于其他原因? - Evk
不,它不起作用。他在单个帧内预取所有消息,并使它们等待确认... - eocron
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接