Azure TableQuery与Parallel.ForEach的线程安全性

8

我有一些基本的Azure表格,我一直在串行查询:

var query = new TableQuery<DynamicTableEntity>()
  .Where(TableQuery.GenerateFilterCondition("PartitionKey",
    QueryComparisons.Equal, myPartitionKey));

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) {
  // Process entity here.
}

为了加快速度,我这样并行化处理:
Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => {
  // Process entity here in a thread-safe manner.

  // Edited to add: Details of the loop body below:

  // This is the essence of the fixed loop body:
  lock (myLock) {
    DataRow myRow = myDataTable.NewRow();
    // [Add entity data to myRow.]
    myDataTable.Rows.Add(myRow);
  }

  // Old code (apparently not thread-safe, though NewRow() is supposed to create
  // a DataRow based on the table's schema without changing the table state):
  /*
    DataRow myRow = myDataTable.NewRow();
    lock (myLock) {
      // [Add entity data to myRow.]
      myDataTable.Rows.Add(myRow);
    }
  */
});

这会显著提高速度,但运行结果在某些方面略有不同(即,有时实体会偶尔不同,尽管返回的实体数量完全相同)。
通过这一点和一些网络搜索,我得出结论:上述枚举器并不总是线程安全的。文档似乎表明只有当表对象为公共静态时才保证线程安全,但这对我没有任何影响。
请问有人能建议如何解决这个问题吗?是否有标准的 Azure 表查询并行模式?

1
枚举器不必是线程安全的,Parallel.ForEach() 可以处理。问题可能出现在实体共享某些状态的情况下。 - svick
1
你能澄清一下“略有不同的结果”是什么意思吗?如果你在Parallel.ForEach中记录所有实体,你是否会得到相同的一组实体但顺序不同? - Sergey Volegov
我已经对实体进行了排序,以确定确切的差异,而且这些集合几乎是相同的。然而,偶尔会有一个特定的实体缺失,而另一个则被复制(与我串行获取的结果相比,后者始终是相同的,并且可能包含表格内容的基本真相)。这似乎是一般模式--有些实体缺失,但其他实体被复制以保持实体计数相同。这几乎就像某个索引没有以线程安全的方式递增,导致在从内存读取实体时出现了竞争条件。 - Paul Lambert
你尝试过通过先执行ExecuteQuery().ToList(),然后将该列表作为源传递给Parallel.ForEach来排除ExecuteQuery调用的影响吗?这样能解决问题吗?如果不能,请分享更多关于循环体的细节。 - Serdar Ozler
我还没有尝试过那个,但是我的循环体稍微改了一下似乎解决了问题。基本上,我把一个 DataTable.NewRow() 调用移到了我的关键部分里面。我不明白为什么这是必要的,因为这个调用只是根据表的模式创建一个新行,而不影响任何表状态(.NET DataTable,而不是 Azure 表)。因此,我不确定问题是否真正解决了,但是到目前为止代码一直都能工作。 - Paul Lambert
1个回答

4

你的评论是正确的:DataTable不适用于涉及修改的并发操作,并且是重复条目的来源。为行修改操作锁定DataTable对象将解决此问题:

 lock (myTable)
 {
    DataRow myRow = myTable.NewRow();
    myRow.SetField<int>("c1", (int)value);
    myTable.Rows.Add(myRow);
 }

将NewRow()放在锁的外面有时会导致表中出现重复行条目或者在NewRow()一行上出现"System.ArgumentException"异常。如果需要了解并发使用DataTable的额外细节和替代方案,请参见Thread safety for DataTable
要重现错误条件,请使用以下代码。有些运行将是干净的,有些将包含重复条目,有些将遇到异常。
   class Program
   {
      static DataTable myTable = GetTable();
      static ManualResetEvent waitHandle = new ManualResetEvent(false);

      static void Main(string[] args)
      {
         const int threadCount = 10;
         List<Thread> threads = new List<System.Threading.Thread>();
         for (int i = 0; i < threadCount; ++i) 
         {
            threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread)));
            threads[i].Start(i);
         }
         waitHandle.Set(); // Release all the threads at once
         for (int i = 0; i < threadCount; ++i) 
         {
            threads[i].Join();
         }

         // Print results once threads return
         for (int i = 0; i < myTable.Rows.Count; ++i)
         {
            Console.WriteLine(myTable.Rows[i].Field<int>(0));
         }
         Console.WriteLine("---Processing Complete---");
         Console.ReadKey();
      }

      static void AddRowThread(object value)
      {
         waitHandle.WaitOne();
         DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS
         lock (myTable)
         {
            //DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE
            myRow.SetField<int>("c1", (int)value);
            myTable.Rows.Add(myRow);
         }
      }

      static DataTable GetTable()
      {
         // Here we create a DataTable with four columns.
         DataTable table = new DataTable();
         table.Columns.Add("c1", typeof(int));       
         return table;
      }
   }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接