我正在使用TDF技术开发我的应用程序,效果非常好,但不幸的是我遇到了一个特殊问题,似乎现有的Dataflow机制无法直接处理:
我有N个生产者(在这种情况下是BufferBlocks),它们全部连接到同一个ActionBlock。该块总是一次只处理一个项目,并且也只能容纳一个项目。
除了要将生产者与ActionBlock相连之外,我还想添加一个过滤器,但特殊情况是过滤条件可以独立于已处理的项目而改变,而且不能丢弃项目!因此,基本上我想处理所有项目,但顺序/时间可能会改变。
不幸的是,我了解到,如果某个项目被“拒绝”一次 - > 过滤条件计算结果为false,如果此项目未传递到另一块(例如NullTarget),则目标块不会重试同一项(也不会重新评估过滤器)。
该代码建立了一个测试管道,并用奇数和偶数填充了两个缓冲区。两个BufferBlock都连接到一个仅打印“processed”数字并等待2秒的单个ActionBlock。
m_bufferBlock1和m_actionBlock之间的过滤条件(测试目的)检查自我们开始整个过程以来是否已经过去了1分钟。
如果运行此代码,将生成以下输出:
正如我们所看到的,ActionBlock从BufferBlock中获取第一个元素而不进行过滤,然后尝试通过过滤器获取来自BufferBlock的元素。但是,该过滤器评估为false,它继续从块中获取所有没有过滤器的元素。
我的期望是,在处理了来自未经过滤的BufferBlock的元素之后,它会再次尝试从具有过滤器的另一个BufferBlock中获取元素,并再次对其进行评估。
这将是我预期(或期望)的结果:
我的问题是,是否有一种方式可以“重置”已经“拒绝”的消息,以便重新评估它,或者是否有另一种方法通过不同的建模方式来解决这个问题?简单概述一下,严格交替从两个缓冲区中取出并不重要!(因为我知道这取决于调度,并且如果偶尔从同一块中出列两个项目,则完全没有问题)但是,“拒绝”消息不能被丢弃或重新排队,因为一个缓冲区内的顺序很重要。
提前感谢。
我有N个生产者(在这种情况下是BufferBlocks),它们全部连接到同一个ActionBlock。该块总是一次只处理一个项目,并且也只能容纳一个项目。
除了要将生产者与ActionBlock相连之外,我还想添加一个过滤器,但特殊情况是过滤条件可以独立于已处理的项目而改变,而且不能丢弃项目!因此,基本上我想处理所有项目,但顺序/时间可能会改变。
不幸的是,我了解到,如果某个项目被“拒绝”一次 - > 过滤条件计算结果为false,如果此项目未传递到另一块(例如NullTarget),则目标块不会重试同一项(也不会重新评估过滤器)。
public class ConsumeTest
{
private readonly BufferBlock<int> m_bufferBlock1;
private readonly BufferBlock<int> m_bufferBlock2;
private readonly ActionBlock<int> m_actionBlock;
public ConsumeTest()
{
m_bufferBlock1 = new BufferBlock<int>();
m_bufferBlock2 = new BufferBlock<int>();
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 };
m_actionBlock = new ActionBlock<int>((item) => BlockAction(item), options);
var start = DateTime.Now;
var elapsed = TimeSpan.FromMinutes(1);
m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed));
m_bufferBlock2.LinkTo(m_actionBlock);
FillBuffers();
}
private void BlockAction(int item)
{
Console.WriteLine(item);
Thread.Sleep(2000);
}
private void FillBuffers()
{
for (int i = 0; i < 1000; i++)
{
if (i % 2 == 0)
{
m_bufferBlock1.Post(i);
}
else
{
m_bufferBlock2.Post(i);
}
}
}
private bool IsTimeElapsed(DateTime start, TimeSpan elapsed)
{
Console.WriteLine("checking time elapsed");
return DateTime.Now > (start + elapsed);
}
public async Task Start()
{
await m_actionBlock.Completion;
}
}
该代码建立了一个测试管道,并用奇数和偶数填充了两个缓冲区。两个BufferBlock都连接到一个仅打印“processed”数字并等待2秒的单个ActionBlock。
m_bufferBlock1和m_actionBlock之间的过滤条件(测试目的)检查自我们开始整个过程以来是否已经过去了1分钟。
如果运行此代码,将生成以下输出:
1
checking time elapsed
3
5
7
9
11
13
15
17
19
正如我们所看到的,ActionBlock从BufferBlock中获取第一个元素而不进行过滤,然后尝试通过过滤器获取来自BufferBlock的元素。但是,该过滤器评估为false,它继续从块中获取所有没有过滤器的元素。
我的期望是,在处理了来自未经过滤的BufferBlock的元素之后,它会再次尝试从具有过滤器的另一个BufferBlock中获取元素,并再次对其进行评估。
这将是我预期(或期望)的结果:
1
checking time elapsed
3
checking time elapsed
5
checking time elapsed
7
checking time elapsed
9
checking time elapsed
11
checking time elapsed
13
checking time elapsed
15
// after timer has elapsed take elements also from other buffer
2
17
4
19
我的问题是,是否有一种方式可以“重置”已经“拒绝”的消息,以便重新评估它,或者是否有另一种方法通过不同的建模方式来解决这个问题?简单概述一下,严格交替从两个缓冲区中取出并不重要!(因为我知道这取决于调度,并且如果偶尔从同一块中出列两个项目,则完全没有问题)但是,“拒绝”消息不能被丢弃或重新排队,因为一个缓冲区内的顺序很重要。
提前感谢。