异步生产者/消费者

3

我有一个类的实例,该实例从多个线程访问。此类接受这些调用并将元组添加到数据库中。由于某些数据库约束条件,我需要以串行方式执行此操作,因为并行线程可能导致数据库不一致。

由于我对C#中的并行和并发是新手,因此我执行了以下操作:

private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

public void AddDData(string info)
{
    Task t = new Task(() => { InsertDataIntoBase(info); });
    _tasks.Add(t);
}

private void InsertWorker()
{
    Task.Factory.StartNew(() =>
    {
        while (!_tasks.IsCompleted)
        {
            Task t;
            if (_tasks.TryTake(out t))
            {
                t.Start();
                t.Wait();
            }
        }
    });
}
AddDData被多个线程调用,InsertDataIntoBase是一个非常简单的插入操作,应该只需几毫秒时间。问题在于,由于我的知识不足,我无法确定原因,有时会调用两次任务!它总是这样进行的:
T1 T2 T3 T1 <- 主键错误。 T4
我是否完全误解了.Take(),或者我漏掉了什么,或者我的生产者/消费者实现真的很糟糕?最好的问候,拉斐尔。
更新:如建议所示,我使用此架构制作了一个快速沙盒测试实现,并且正如我怀疑的那样,它不能保证在上一个任务完成之前不会触发下一个任务。所以问题仍然存在:如何正确排队任务并按顺序执行它们?
更新2:我简化了代码。
private BlockingCollection<Data> _tasks = new BlockingCollection<Data>();

public void AddDData(Data info)
{
    _tasks.Add(info);
}

private void InsertWorker()
{
    Task.Factory.StartNew(() =>
    {
        while (!_tasks.IsCompleted)
        {
            Data info;
            if (_tasks.TryTake(out info))
            {
                InsertIntoDB(info);
            }
        }
    });
}

请注意,我已经将任务删除,因为我依赖于同步的InsertIntoDB调用(因为它在循环内部),但仍然没有成功......生成是正确的,我绝对确定只有唯一的实例进入队列。但无论我尝试什么,有时都会使用相同的对象两次。

1
你是如何生成主键的? - Austin Salonen
只是为了明确一下:你发送的数据不可能出现问题,对吧? - Austin Salonen
@RafaBorges 先进先出队列或锁,你做的都是一样的,序列化数据库调用。我看不出有什么区别。 - I4V
1
为了确认您确实会执行相同的任务两次,请将PK和TaskID(Task.CurrentId)写入命令行并查看输出。当启动100万个以上的任务时,我无法复现此问题... - Austin Salonen
1
msdn:删除项的顺序取决于用于创建 BlockingCollection<T> 实例的集合类型。当您创建 BlockingCollection<T> 对象时,可以指定要使用的集合类型。例如,您可以为先进先出(FIFO)行为指定 ConcurrentQueue 对象。BlockingCollection<T> 的默认集合类型是 ConcurrentQueue<T>。 - sga101
显示剩余12条评论
3个回答

1
我认为这应该可以工作:

    private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        InsertWorker();
        GenerateItems(10, 1000);
        _itemsToProcess.CompleteAdding();
    }

    private static void InsertWorker()
    {
        Task.Factory.StartNew(() =>
        {
            while (!_itemsToProcess.IsCompleted)
            {
                string t;
                if (_itemsToProcess.TryTake(out t))
                {
                    // Do whatever needs doing here
                    // Order should be guaranteed since BlockingCollection 
                    // uses a ConcurrentQueue as a backing store by default.
                    // http://msdn.microsoft.com/en-us/library/dd287184.aspx#remarksToggle
                    Console.WriteLine(t);
                }
            }
        });
    }

    private static void GenerateItems(int count, int maxDelayInMs)
    {
        Random r = new Random();
        string[] items = new string[count];

        for (int i = 0; i < count; i++)
        {
            items[i] = i.ToString();
        }

        // Simulate many threads adding items to the collection
        items
            .AsParallel()
            .WithDegreeOfParallelism(4)
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .Select((x) =>
            {
                Thread.Sleep(r.Next(maxDelayInMs));
                _itemsToProcess.Add(x);
                return x;
            }).ToList();
    }

这意味着消费者是单线程的,但允许多个生产者线程。

我尝试了一百万次迭代,但没有出现重复。我会再考虑一下的。 - sga101
3
请注意,你可以通过将while(!IsCompleted)TryTake替换为单个foreach (string t in _itemsToProcess.GetConsumingEnumerable())来简化你的工作程序。请参阅GetConsumingEnumerable - Jim Mischel
@JimMischel 我考虑过这个,但是这里的顺序很重要,而且这个 MSDN 文档说使用这种方法时不能保证顺序。 - sga101
@sga101 经过数小时的调试和记录,我发现错误并不在代码的这一部分,而是在数据库插入中。似乎有一个恶意 bug 是由于我创建的代码异味导致的竞争条件。由于你的代码帮助我找到了这个问题,所以给你点赞和千万个感谢! - Rafa Borges
@sga101:我猜你说的是这一行:“不能保证生产者线程添加的项以相同的顺序枚举。”老实说,我不知道那一行应该是什么意思。我向你保证,GetConsumingEnumerable确实按照FIFO顺序删除元素。查看.NET Framework源代码可以证实这一点:GetConsumingEnumerable只是一个循环中的一堆TryTake调用。项目肯定是按照插入顺序被删除的。 - Jim Mischel
@RafaBorges:请查看我的先前评论,我误将其发布到sga101。 - Jim Mischel

0

从你的评论中

"我简化了这里显示的代码,因为数据不是一个字符串"

我假设传递给AddData的info参数是一个可变引用类型。请确保调用者在多次调用时不要使用相同的info实例,因为该引用被捕获在Task lambda中。


是的,它是一个可变的引用类型,我会仔细检查一下(尽管我认为这不是问题所在)。竞争条件可能会导致此行为,但是使用BlockingCollection作为缓冲区不会减少这种风险吗? - Rafa Borges
Rafa Borges:我误读了代码 - 由于它确保项目按顺序执行,因此不存在竞争条件。但这并不排除同一项被不同任务捕获的可能性。 - alexm

0
根据您提供的跟踪信息,唯一的逻辑可能性是您已经调用了InsertWorker两次(或更多)。因此,有两个后台线程正在等待项目出现在集合中,并且偶尔它们都能够抓住一个项目并开始执行它。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接