明显的BufferBlock.Post/Receive/ReceiveAsync竞争/错误

17

此帖已发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道...我没有充分利用TplDataflow的潜力。目前,我只是使用BufferBlock作为安全队列进行消息传递,其中生产者和消费者以不同的速率运行。我看到了一些奇怪的行为,让我不知道该如何继续。

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}
在上面的代码中(它是一个2000行分布式解决方案的一部分),每隔约100毫秒就会定期调用Send。这意味着大约每秒发送10个项目到messageQueue。这已经得到验证。然而,偶尔会出现ReceiveAsync在超时内无法完成的情况(即Post未导致ReceiveAsync完成),并且30秒后会引发TimeoutException。此时,messageQueue.Count达到了数百个,这是不期望的。这个问题也在更慢的发布速率下(每秒1个发布)观察到,并且通常发生在BufferBlock中通过的1000个项目之前。
因此,为了解决这个问题,我正在使用以下代码,它可以工作,但是偶尔会在接收时导致1s的延迟(由于上述错误的发生)。
    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }
这对我来说看起来像是TDF中的竞态条件,但我无法弄清楚为什么在其他类似情况下使用BufferBlock时这种情况不会发生。 将ReceiveAsync从实验性更改为Receive并没有帮助。 我没有检查过,但我想象上面的代码在隔离情况下运行得很完美。 这是我在“TPL数据流入门”中看到的一种模式tpldataflow.docx
有什么方法可以彻底解决这个问题? 有哪些指标可以帮助推断出正在发生的事情? 如果我无法创建可靠的测试用例,我可以提供更多信息吗?
帮帮我!

1
我认为你所做的和你在这里的期望都没有任何问题。我肯定认为你需要在 MSDN 论坛上保持活跃,而不是在这里。你已经引起了 @StephenToub 的注意,他绝对是你想要研究这个问题的人。 - Drew Marsh
没错。我从未完全解决过这个问题。我无法在一个小的、自包含的示例中重现这个问题。因为我只是使用了 BufferBlock,所以我自己实现了一个异步队列。我不必改变任何其他代码... 我只是重新实现了我正在使用的 BufferBlock 接口的部分。现在效果很好,这让我想到可能有什么地方出了问题,但我无法证明。唉。 - spender
@spendor 非常有趣,奇怪的是我在发现BufferBlock之后放弃了自己的异步并发队列实现...现在我将不得不重新考虑。谢谢。 - Andrew Hanlon
有人知道这个问题是否仍然存在吗? - Eyal Perry
@EyalPerry 我在许多其他项目中使用(并宣传)数据流,从未遇到过这个问题。考虑到产品现在相对于6年前的成熟度,如果这仍然是一个问题,我会非常惊讶。 - spender
我目前遇到了这个问题。有人解决过吗? - Matt Ellen
1个回答

1

Stephen似乎认为以下是解决方案:

var m = await messageQueue.ReceiveAsync();

而不是:

var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

您能否确认或否认这一点?


那个方法没有起作用。无论我选择哪个ReceiveAsync重载,结果都是一样的。请参见我的上面的评论以了解我的解决方案。 - spender

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接