在Parallel.For循环中使用Guid.NewGuid()返回了重复的值

4

我有一个应用程序正在调用API。因此,它会查询对象中的所有ID,然后必须按ID逐个查询每个项目。我使用Parallel.For循环完成这个过程,并将每个项目的数据添加到datatable的一行中。然后,我使用sqlbulkcopy将datatable发送到SQL服务器表。

如果我不使用Parallel.For,则可以正常工作。但是,使用Parallel.For时,这一行代码:

workrow["id"] = Guid.NewGuid();

正在生成重复的 GUID。 它经常这样做,导致数据无法加载到 SQL 服务器表中,因为 SQL 中的 ID 行是主键,不允许重复。 我尝试了锁定:

                    lock (lockobject)
                    {
                        workrow["id"] = Guid.NewGuid();
                    }

这并没有起到帮助的作用。
我尝试不给该字段分配ID,以便SQL生成它(该字段确实有newid())。但是失败了,并表示无法插入空值。 我似乎无法仅从数据表中删除ID字段,因为那样做时,当我执行sqlbulkcopy操作时,列就不会对齐。

有人能帮帮我吗?我需要弄清楚如何让Guid.NewGuid()停止产生重复,或者需要找到一种不传递ID的方法(通常是数据表中的第一个字段),以便SQL生成ID。

以下是我用来生成其中一个表的代码:

        public static DataTable MakeWorkflowTable()
        {
            DataTable Workflow = new DataTable("Workflow");
            DataColumn id = new DataColumn("id", System.Type.GetType("System.Guid"));
            Workflow.Columns.Add(id);
            DataColumn OrgInfoID = new DataColumn("OrgInfoID", System.Type.GetType("System.Guid"));
            Workflow.Columns.Add(OrgInfoID);
            DataColumn Name = new DataColumn("Name", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Name);
            DataColumn Active = new DataColumn("Active", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Active);
            DataColumn Description = new DataColumn("Description", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Description);
            DataColumn Object = new DataColumn("Object", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Object);
            DataColumn Formula = new DataColumn("Formula", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Formula);
            DataColumn ManageableState = new DataColumn("ManageableState", System.Type.GetType("System.String"));
            Workflow.Columns.Add(ManageableState);
            DataColumn NameSpacePrefix = new DataColumn("NameSpacePrefix", System.Type.GetType("System.String"));
            Workflow.Columns.Add(NameSpacePrefix);
            DataColumn TDACount = new DataColumn("TDACount", System.Type.GetType("System.Int32"));
            Workflow.Columns.Add(TDACount);
            DataColumn TriggerType = new DataColumn("TriggerType", System.Type.GetType("System.String"));
            Workflow.Columns.Add(TriggerType);
            DataColumn CreatedDate = new DataColumn("CreatedDate", System.Type.GetType("System.DateTime"));
            Workflow.Columns.Add(CreatedDate);
            DataColumn CreatedBy = new DataColumn("CreatedBy", System.Type.GetType("System.String"));
            Workflow.Columns.Add(CreatedBy);
            DataColumn LastModifiedDate = new DataColumn("LastModifiedDate", System.Type.GetType("System.DateTime"));
            Workflow.Columns.Add(LastModifiedDate);
            DataColumn LastModifiedBy = new DataColumn("LastModifiedBy", System.Type.GetType("System.String"));
            Workflow.Columns.Add(LastModifiedBy);
            return Workflow;
        }

这是我用来将其发送到SQL服务器的代码:

        public static void SendDTtoDB(ref DataTable dt, ref SqlConnection cnn, string TableName)
        {
            using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cnn))
            {
                bulkCopy.DestinationTableName =
                    TableName;
                try
                {
                    bulkCopy.WriteToServer(dt);
                    dt.Clear();
                }
                catch (Exception e)
                {
                    logger.Warn("SendDTtoDB {TableName}: ORGID: {ORGID} : {Message}", TableName, dt.Rows[0]["OrgInfoID"], e.Message.ToString());
                    if (e.Message.ToString().Contains("PRIMARY KEY"))
                    {
                        foreach(DataRow row in dt.Rows)
                        {
                            logger.Warn("ID: {id}", row["id"]);
                        }
                    }
                }
            }

        }

正如您在catch语句中所看到的,我将其设置为将ID写入日志,以便我自己查看,结果确实有一个重复项。真是让人沮丧!如果不必要,我真的不想去掉Parallel.For并将其单线程化。

根据请求,这是使用Parallel.For的代码

              if (qr.totalSize > 0)
                {
                    object lockobject = new object();
                    Parallel.For(0, qr.records.Length, i =>
                    {
                        ToolingService.CustomTab1 vr = new ToolingService.CustomTab1();

                        vr = (ToolingService.CustomTab1)qr.records[i];
                        string mdSOQL = "Select FullName, description, ManageableState, MasterLabel, NamespacePrefix, Type, Url, CreatedDate, CreatedBy.Name, "
                            + "LastModifiedDate, LastModifiedBy.Name From CustomTab where id='" + vr.Id + "'";
                        ToolingService.QueryResult mdqr = new ToolingService.QueryResult();
                        ToolingService.CustomTab1 vrmd = new ToolingService.CustomTab1();
                        mdqr = ts.query(mdSOQL);
                        vrmd = (ToolingService.CustomTab1)mdqr.records[0];

                        DataRow workrow = CustomTabs.NewRow();
                        lock (lockobject)
                        {
                            workrow["id"] = Guid.NewGuid();
                        }
                        workrow["OrgInfoID"] = _orgDBID;
                        workrow["FullName"] = vrmd.FullName;
                        workrow["Description"] = vrmd.Description ?? Convert.DBNull;
                        workrow["ManageableState"] = vrmd.ManageableState;
                        workrow["MasterLabel"] = vrmd.MasterLabel ?? Convert.DBNull;
                        workrow["NameSpacePrefix"] = vrmd.NamespacePrefix ?? Convert.DBNull;
                        workrow["Type"] = vrmd.Type ?? Convert.DBNull;
                        workrow["URL"] = vrmd.Url ?? Convert.DBNull;
                        workrow["CreatedDate"] = vrmd.CreatedDate ?? Convert.DBNull;
                        if (vrmd.CreatedBy == null)
                        {
                            workrow["CreatedBy"] = Convert.DBNull;
                        }
                        else
                        {
                            workrow["CreatedBy"] = vrmd.CreatedBy.Name;
                        }
                        workrow["LastModifiedDate"] = vrmd.LastModifiedDate ?? Convert.DBNull;
                        if (vrmd.LastModifiedBy == null)
                        {
                            workrow["LastModifiedBy"] = Convert.DBNull;
                        }
                        else
                        {
                            workrow["LastModifiedBy"] = vrmd.LastModifiedBy.Name;
                        }
                        lock (CustomTabs)
                        {
                            CustomTabs.Rows.Add(workrow);
                        }

                    });
                    OrgTables.SendDTtoDB(ref CustomTabs, ref _cnn, "OrgCustomTabs");

1
你能展示一下parallel.for吗?Guid.NewGuid不应该产生重复的值;很可能是你的代码有问题,但我们无法检查。 - user47589
3
如果您在数据库中使用Parallel.For,那么十有八九是从一开始就做错了什么。首先,IO负载不适合使用TPL方法。其次,数据库访问本质上是不线程安全的。第三,最好将其作为单个批次查询完成。 - TheGeneral
3
我注意到你在lock之外创建了 DataRow (DataRow workrow = CustomTabs.NewRow();),然而根据Parallel.ForEach and DataTable - Isn't DataTable.NewRow() a thread safe “read” operation?NewRow() 不是线程安全的。因此,你可能并没有实际上创建两个相同的 Guid,而是单个 Guid 值被分配给了多行。 - Lance U. Matthews
2
如果你想加速批量插入,你需要放弃使用 DataTable。这是远远不是将数据传递给 SqlBulkCopy 最有效的方式。SqlBulkCopy 本身就是一个流接口,因此并行收集数据除非生成数据本身真正成为瓶颈,否则不会带来任何好处。请查看通过采用 IDataReader 的重载进行流式传输。不要并行获取源行,而应使用 IN 或表值参数。 - Jeroen Mostert
2
请注意,如果GUID在数据库中生成,当使用多个线程和锁的不当使用时,仍然可能会在您的DataTable中获得损坏的数据 ;) - Julian
显示剩余3条评论
3个回答

12
我之前见过这个问题。 Guid.NewGuid() 没有问题,但是 DataTable 不是线程安全的!

DataTable 简单地说不适合并发使用(特别是涉及任何形式的更改)。

参见DataTable的线程安全性

另请参阅:C# DataGridView DataTable在并行循环中出现内部索引损坏

之所以使用循环,是因为我必须一遍又一遍地查询API来获取每个对象中的项目。 我正在进行 Parallel.For 循环,以加快进程速度。 如果需要一次一个地获取450个或更多个项目,我希望多线程处理以提高速度。 数据库访问不是在循环中完成的,只是构建 DataTable,因为一旦从 API 获取数据,我就需要存储它。

你可以创建一个类型,并将它们添加到 ConcurrentBag<T>ConcurrentQueue<T>(或其他并发集合,请参见 MS Docs),这些都是线程安全的 :)

然后在这之后,您可以使用单个线程构建 DataTable。或者,如果可能的话,可以跳过整个 DataTable。


据我所读,只要在向数据表添加行时使用锁定,就应该没问题。这不正确吗? - Mike Jones
我猜你需要对整个表格进行锁定。但是当有线程安全的替代方案时,我从不建议使用锁定。 - Julian
更新了答案。 - Julian
非常感谢您的时间和知识!我现在要去Youtube和Google学习ConcurrentBag。:) 我从不放弃,所以最终我会弄明白它们的! - Mike Jones

3
事实上,在 DataTable 中使用 Parallel.ForEach 并需要在其中使用锁,有点违背使用 Parallel.ForEach 的初衷。但是,我很惊讶你在调用 DataRow workrow = CustomTabs.NewRow(); 时没有出现异常,因为在我的测试中,我遇到了索引错误的异常。我不得不将对 NewRow 的调用包装在 lock 里面。代码如下:
Parallel.ForEach(data, x =>
            {
                DataRow row = null;
                lock (lockRow)
                {
                    row = dt.NewRow();
                    row["Guid"] = Guid.NewGuid();
                }
...
                lock(lockObj)
                   dt.Rows.Add(row);

lockObjlockRow 是两个分别实例化的静态对象。


这段话涉及到 IT 技术,讲解了两个静态对象的定义。
static  object lockObj = new  object();
static  object lockRow = new object();

这对我有效,将1百万行数据添加到DataTable并确保所有Guid都是唯一的。

综上所述,强烈建议按照Julian的建议编写代码或创建一个实现IDataReader接口的类(可以与SQLBulkCopy一起使用)并使用该类上传数据。


1
很高兴你已经让它工作了!我同意“在DataTable中锁定Parallel.ForEach有点违背使用Parallel.ForEach的初衷”。因此,我提出了一种线程安全的方法,而不需要锁定。(当然,并发集合并不能保证无锁,但这些集合最小化了锁定) - Julian
这正是发生的事情!在发布了这篇文章之后,我找到了这篇文章[链接](https://programmersunlimited.wordpress.com/2012/01/26/datatables-thread-safe/),并解决了这个问题。我已经进行了更改,似乎已经解决了问题。虽然最终Julian的答案可能是最好的,但我还不那么熟练,看文档时,它对我来说就像希腊语一样难懂。我永远不会停止挖掘和学习,所以我会弄清楚的,但现在这让我重新运行起来了。谢谢! - Mike Jones

-1

我使用了一个GUID生成器,每次访问它时都会进行锁定。


lock (guidGenerator)
{
  entity.Id = guidGenerator.NewGuid();
}

对我来说很好用。不过这是一种不同的方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接