DataTable的线程安全性

17
我已经阅读了这篇回答 ADO.NET DataTable/DataRow Thread Safety,但有些地方我不是很理解。 特别是我不理解[2]文章的意思。我需要使用什么类型的包装器? 有人能给一个例子吗?
此外,我不明白作者在谈论级联锁和完全锁时的意思。请也给出例子。

2
坦白地说,如果线程安全是一个问题,更好的方法是“停止使用DataTable”。你想做什么,为什么觉得DataTable是一个合适的解决方案?(提示:它很少是) - Marc Gravell
@MarcGravell DataTable对象是使用SqlBulkCopy填充数据库的简单而舒适的方式。我使用它将解析后的日志加载到数据库中。 - Jonik
a: 日志加载不需要并发处理(日志应在传递给加载程序之前准备好)。 b: 所有的 SqlBulkCopy 需要的只是类似于 IDataReader 的东西 - 你可以在任何对象模型上实现它(例如,FastMember 提供了 ObjectReader,甚至可以与迭代器块(即 yield return)一起使用)。 - Marc Gravell
个人而言,我认为这仍然是不可取的,但我会在我的回答中编辑并评论您具体的情况。 - Marc Gravell
3个回答

30

DataTable并不是为并发使用而设计或预期的(特别是涉及任何形式的变异时)。在我看来,建议的“包装器”应该是:

  • 消除需要同时在DataTable上工作的需求(涉及变异时),或者:
  • 删除DataTable,改用直接支持您所需内容的数据结构(例如并发集合),或者更简单且可以轻松同步(排他或读写器)的数据结构。

基本上:解决问题。


从评论中得知:

The code looks like:

Parallel.ForEach(strings, str=>
{
    DataRow row;
    lock(table){
        row= table.NewRow();
    }
    MyParser.Parse(str, out row);
    lock(table){
        table.Rows.Add(row)
    }
});
我只能希望这里的 out row 是一个笔误,因为这样不会导致通过 NewRow() 创建的行被填充,但是:如果你一定要使用这种方法,你不能使用 NewRow,因为挂起的行有点共享。你最好的选择是:
Parallel.ForEach(strings, str=> {
    object[] values = MyParser.Parse(str);
    lock(table) {
        table.Rows.Add(values);
    }
});

上述中的重要变化是lock涵盖了整个新行处理过程。请注意,使用Parallel.ForEach时无法保证顺序,因此重要的是最终顺序不需要完全匹配(如果数据包括时间组件,则不应该有问题)。
然而!我仍然认为你的方法是错误的:对于并行处理来说,它必须是非平凡的数据。如果您有非平凡的数据,您真的不想将其全部缓冲到内存中。我强烈建议执行以下操作,这在单个线程上可以正常工作:
using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
    bcp.DestinationTable = "MyLog";
    bcp.WriteToServer(reader);    
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
    using(var reader = File.OpenText(path))
    {
        string line;
        while((line = reader.ReadLine()) != null)
        {
            yield return new LogRow {
                // TODO: populate the row from line here
            };
        }
    }
}
...
public sealed class LogRow {
    /* define your schema here */
}

优点:

  • 无需缓冲 - 这是一个完全流式操作(yield return不会将东西放入列表或类似的容器中)
  • 因此,行可以立即开始流式传输而无需等待整个文件进行预处理
  • 没有内存饱和问题
  • 没有线程复杂性/开销
  • 您可以保留原始顺序(通常不关键,但很好)
  • 您只受到读取原始文件的速度约束,这通常比从多个线程读取原始文件更快(单个IO设备上的争用只是开销)
  • 避免了DataTable的所有开销,在这里使用它是过度设计 - 因为它非常灵活,所以有显着的开销
  • 读取(来自日志文件)和写入(到数据库)现在是并发而不是顺序的

我在自己的工作中经常做像上面那样的事情,并且根据经验通常比首先在内存中填充DataTable至少快两倍。


最后 - 这是一个IEnumerable<T>实现的示例,它接受并发读取器和写入器,而无需在内存中缓冲所有内容 - 这将允许多个线程使用单个线程的SqlBulkCopy通过IEnumerable<T> API解析数据(调用Add和最终Close):

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
    private readonly Queue<T> queue = new Queue<T>();

    public void Add(T value)
    {
        lock (queue)
        {
            if (closed) // no more data once closed
                throw new InvalidOperationException("The bucket has been marked as closed");

            queue.Enqueue(value);
            if (queue.Count == 1)
            { // someone may be waiting for data
                Monitor.PulseAll(queue);
            }
        }
    }

    public void Close()
    {
        lock (queue)
        {
            closed = true;
            Monitor.PulseAll(queue);
        }
    }
    private bool closed;

    public IEnumerator<T> GetEnumerator()
    {
        while (true)
        {
            T value;
            lock (queue)
            {
                if (queue.Count == 0)
                {
                    // no data; should we expect any?
                    if (closed) yield break; // nothing more ever coming

                    // else wait to be woken, and redo from start
                    Monitor.Wait(queue);
                    continue;
                }
                value = queue.Dequeue();
            }
            // yield it **outside** of the lock
            yield return value;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

static class Program
{
    static void Main()
    {
        var bucket = new ThreadSafeBucket<int>();
        int expectedTotal = 0;
        ThreadPool.QueueUserWorkItem(delegate
        {
            int count = 0, sum = 0;
            foreach(var item in bucket)
            {
                count++;
                sum += item;
                if ((count % 100) == 0)
                    Console.WriteLine("After {0}: {1}", count, sum);
            }
            Console.WriteLine("Total over {0}: {1}", count, sum);
        });
        Parallel.For(0, 5000,
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            i => {
                bucket.Add(i);
                Interlocked.Add(ref expectedTotal, i);
            }
        );
        Console.WriteLine("all data added; closing bucket");
        bucket.Close();
        Thread.Sleep(100);
        Console.WriteLine("expecting total: {0}",
            Interlocked.CompareExchange(ref expectedTotal, 0, 0));
        Console.ReadLine();


    }

}

我有一个大的日志文件,并在解析时使用并行处理。但是我需要一些容器,在将解析后的数据加载到数据库之前收集它们。通常的代码如下:Parallel.ForEach(strings, str=> { DataRow row; lock(table){ row= table.NewRow(); } MyParser.Parse(str, out row); lock(table){ table.Rows.Add(row) } });你能给出一些正确的解决方案吗?或者一些阅读链接,以正确的方式解决它。 请原谅我的糟糕英语。 - Jonik
@Jonik 注意,上面的 ObjectReader 是来自 FastMember - Marc Gravell
@Jonik 如果是的话,中间的代码块(“你最好的选择可能是:”)可能是你的极限 - 尽管个人而言,我必须说我可能会想要创建一个自定义的线程安全半集合,允许并发/并行队列和阻塞出队。如果你真的想要,我可以很快为你举例说明。 - Marc Gravell
3
它们是独立的设备;如果您先执行所有读取操作,然后再执行所有写入操作,那么就不会有任何重叠的情况——但是:如果你读取了一些数据,然后开始写入......当你在等待磁盘IO时,网络IO正在悄悄地缓冲,这意味着当你回去获取更多数据时,它很可能已经在那里了;同样,在磁盘写缓存中,当您耗尽DB / NIC输入缓冲区并实际上等待更多数据时,磁盘将刷新自己。这一切都可以很好地运作——即使在我们开始添加'异步'之类的好处之前。 - Marc Gravell
3
@basketballfan22 "OS预取" - Marc Gravell
显示剩余5条评论

2
面对相同的问题,我决定实现嵌套的ConcurrentDictionaries。这是通用的,但可以改为使用定义的类型。包括将其转换为DataTable的示例方法。
/// <summary>
/// A thread safe data table
/// </summary>
/// <typeparam name="TX">The X axis type</typeparam>
/// <typeparam name="TY">The Y axis type</typeparam>
/// <typeparam name="TZ">The value type</typeparam>
public class HeatMap<TX,TY,TZ>
{
    public ConcurrentDictionary<TX, ConcurrentDictionary<TY, TZ>> Table { get; set; } = new ConcurrentDictionary<TX, ConcurrentDictionary<TY, TZ>>();

    public void SetValue(TX x, TY y, TZ val)
    {
        var row = Table.GetOrAdd(x, u => new ConcurrentDictionary<TY, TZ>());

        row.AddOrUpdate(y, v => val,
            (ty, v) => val);
    }

    public TZ GetValue(TX x, TY y)
    {
        var row = Table.GetOrAdd(x, u => new ConcurrentDictionary<TY, TZ>());

        if (!row.TryGetValue(y, out TZ val))
            return default;

        return val;

    }

    public DataTable GetDataTable()
    {
        var dataTable = new DataTable();

        dataTable.Columns.Add("");

        var columnList = new List<string>();
        foreach (var row in Table)
        {
            foreach (var valueKey in row.Value.Keys)
            {
                var columnName = valueKey.ToString();
                if (!columnList.Contains(columnName))
                    columnList.Add(columnName);
            }
        }

        foreach (var s in columnList)
            dataTable.Columns.Add(s);

        foreach (var row in Table)
        {
            var dataRow = dataTable.NewRow();
            dataRow[0] = row.Key.ToString();
            foreach (var column in row.Value)
            {
                dataRow[column.Key.ToString()] = column.Value;
            }

            dataTable.Rows.Add(dataRow);
        }

        return dataTable;
    }
}

1

介绍

如果并发或并行是DataTable对象程序的要求,那么可以实现这一点。让我们看两个例子(基本上,我们将看到在所有示例中通常使用AsEnumerable()方法):

1- DataTable上的并行迭代:

.NET提供了原生资源来在DataTable上进行并行迭代,如下面的代码所示:

DataTable dt = new DataTable();
dt.Columns.Add("ID");
dt.Columns.Add("NAME");

dt.Rows.Add(1, "One");
dt.Rows.Add(2, "Two");
dt.Rows.Add(3, "Three");
dt.PrimaryKey = new DataColumn[] { dt1.Columns["ID"] };

Parallel.ForEach(dt.AsEnumerable(), row =>
{
    int rowId = int.Parse(row["ID"]);
    string rowName = row["NAME"].ToString();
    //TO DO the routine that useful for each DataRow object...
});

2-向DataTable添加多个项目:

我认为这是一种非平凡的方法,因为DataTable的核心不是线程安全的集合/矩阵;因此,您需要ConcurrentBag的支持来保证不会在代码中出现异常。

在 "ConcurrentBag - Add Multiple Items?" 中,我写了一个详细的示例,将DataTable对象中的项目添加到派生类ConcurrentBag中,考虑到编程需要在DataTables上使用并发性。然后,在ConcurrentBag上进行程序业务规则添加后,可以将ConcurrentBag集合转换为DataTable并享受并行资源。


感谢这个答案。它非常有帮助,因为我需要在多线程模式下处理 DataRow 对象。 - vibs2006

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接