如何迭代一个BufferBlock<T>中的项?

3
我最近开始使用来自.NET 4.5的TPL Dataflow库,整个块的概念对我来说都是新的。我正在我的应用程序中实现生产者-消费者队列,并且需要防止重复消息被放入队列,因此需要检查消息是否已经排队。我正在使用BufferBlock<Message>类型(Message是自定义类型)。BufferBlock有Count属性,但这并不能解决问题,因为消息需要唯一标识。
有没有办法检查一个BufferBlock是否包含一个项目或浏览所有项目并检查它们?是否可以将BufferBlock转换为允许迭代项目的东西?我正在遵循我在MSDN上看到的示例,它没有检查项目是否在队列中,但我认为检查队列内容是一个相当需要的操作。感谢任何帮助。

你很幸运...就在几天前,我也遇到并解决了这个问题... - spender
@spender 很高兴我不是唯一一个。实际上,当我在这里寻找答案时,我读了你发布的一个问题:https://dev59.com/-2kw5IYBdhLWcg3wMHqm - Anshul
糟糕...我从来没有解决过那个错误...我无法重现一个适当的测试用例。最近我重新采用了Dataflow,并且没有遇到任何问题。 - spender
1个回答

6

不要试图破解BufferBlock,相反,为什么不在链中插入一个TransformManyBlock,让它替你完成这个任务呢?你可以使用HashSet,其中Add方法只会在该项未被添加时返回true。这很简单,但存储需求随着时间显然会增加...

void Main()
{
    var bb = new BufferBlock<string>();
    var db = DataflowEx.CreateDistinctBlock<string>();
    var ab = new ActionBlock<string>(x => Console.WriteLine(x));
    bb.LinkTo(db);
    db.LinkTo(ab);
    bb.Post("this");
    bb.Post("this");
    bb.Post("this");
    bb.Post("is");
    bb.Post("is");
    bb.Post("a");
    bb.Post("test");
}

public class DataflowEx
{
    public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
    {
        var hs = new HashSet<T>();
        //hs will be captured in the closure of the delegate
        //supplied to the TransformManyBlock below and therefore
        //will have the same lifespan as the returned block.
        //Look up the term "c# closure" for more info
        return new TransformManyBlock<T, T>(
                         x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
    }
}

这个方法奏效的原因是,TransformManyBlock 就像 Linq 的 SelectMany 一样,可以有效地展开列表。因此,TransformManyBlock 接收一个返回 IEnumerable<T> 的委托,并逐个提供返回的 IEnumerable<T> 中的项。通过返回一个包含 0 或 1 个项目的 IEnumerable<T>,我们可以有效地创建类似于 Where 行为的效果,根据某些条件是否得到满足来允许或阻止一个项目通过。在这种情况下,谓词是我们是否能将项目添加到已捕获的 HashSet 中。

我正在尝试按照您提供的代码进行操作。据我理解,消息会先通过TransformManyBlock,再传递到ActionBlock中?TransformBlock如何跟踪所有经过它的项?变量"hs"是该调用的局部变量,因此在调用之后将被销毁? - Anshul
1
@Anshul 通常是这样的..但在 lambda 的上下文中不是。 - Simon Whitehead
@Anshul:好的。我会在我的答案中加注以回答你的其他问题。 - spender
@SimonWhitehead 这很有趣,那么变量在 lambda 的上下文中存在多长时间? - Anshul
@Anshul,我之前提到了linq的SelectMany,因为它实际上执行的操作与TransformManyBlock相同。你可能可以只使用SelectMany方法重写所有Linq(虽然效率不高),所以弄清楚它为什么如此重要是值得的。Bart de Smet在这里有一个很好的介绍:http://dotnet.dzone.com/news/selectmany-probably-the-most-p - spender
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接