我知道...我没有充分利用TplDataflow的潜力。目前,我只是使用BufferBlock
作为安全队列进行消息传递,其中生产者和消费者以不同的速率运行。我看到了一些奇怪的行为,让我不知道该如何继续。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
在上面的代码中(它是一个2000行分布式解决方案的一部分),每隔约100毫秒就会定期调用Send
。这意味着大约每秒发送10个项目到messageQueue
。这已经得到验证。然而,偶尔会出现ReceiveAsync
在超时内无法完成的情况(即Post
未导致ReceiveAsync
完成),并且30秒后会引发TimeoutException
。此时,messageQueue.Count
达到了数百个,这是不期望的。这个问题也在更慢的发布速率下(每秒1个发布)观察到,并且通常发生在BufferBlock
中通过的1000个项目之前。因此,为了解决这个问题,我正在使用以下代码,它可以工作,但是偶尔会在接收时导致1s的延迟(由于上述错误的发生)。
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
这对我来说看起来像是TDF中的竞态条件,但我无法弄清楚为什么在其他类似情况下使用BufferBlock
时这种情况不会发生。 将ReceiveAsync
从实验性更改为Receive
并没有帮助。 我没有检查过,但我想象上面的代码在隔离情况下运行得很完美。 这是我在“TPL数据流入门”中看到的一种模式tpldataflow.docx。有什么方法可以彻底解决这个问题? 有哪些指标可以帮助推断出正在发生的事情? 如果我无法创建可靠的测试用例,我可以提供更多信息吗?
帮帮我!