我有一个应用程序正在调用API。因此,它会查询对象中的所有ID,然后必须按ID逐个查询每个项目。我使用Parallel.For循环完成这个过程,并将每个项目的数据添加到datatable的一行中。然后,我使用sqlbulkcopy将datatable发送到SQL服务器表。
如果我不使用Parallel.For,则可以正常工作。但是,使用Parallel.For时,这一行代码:
workrow["id"] = Guid.NewGuid();
正在生成重复的 GUID。 它经常这样做,导致数据无法加载到 SQL 服务器表中,因为 SQL 中的 ID 行是主键,不允许重复。 我尝试了锁定:
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
这并没有起到帮助的作用。
我尝试不给该字段分配ID,以便SQL生成它(该字段确实有newid())。但是失败了,并表示无法插入空值。
我似乎无法仅从数据表中删除ID字段,因为那样做时,当我执行sqlbulkcopy操作时,列就不会对齐。
有人能帮帮我吗?我需要弄清楚如何让Guid.NewGuid()停止产生重复,或者需要找到一种不传递ID的方法(通常是数据表中的第一个字段),以便SQL生成ID。
以下是我用来生成其中一个表的代码:
public static DataTable MakeWorkflowTable()
{
DataTable Workflow = new DataTable("Workflow");
DataColumn id = new DataColumn("id", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(id);
DataColumn OrgInfoID = new DataColumn("OrgInfoID", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(OrgInfoID);
DataColumn Name = new DataColumn("Name", System.Type.GetType("System.String"));
Workflow.Columns.Add(Name);
DataColumn Active = new DataColumn("Active", System.Type.GetType("System.String"));
Workflow.Columns.Add(Active);
DataColumn Description = new DataColumn("Description", System.Type.GetType("System.String"));
Workflow.Columns.Add(Description);
DataColumn Object = new DataColumn("Object", System.Type.GetType("System.String"));
Workflow.Columns.Add(Object);
DataColumn Formula = new DataColumn("Formula", System.Type.GetType("System.String"));
Workflow.Columns.Add(Formula);
DataColumn ManageableState = new DataColumn("ManageableState", System.Type.GetType("System.String"));
Workflow.Columns.Add(ManageableState);
DataColumn NameSpacePrefix = new DataColumn("NameSpacePrefix", System.Type.GetType("System.String"));
Workflow.Columns.Add(NameSpacePrefix);
DataColumn TDACount = new DataColumn("TDACount", System.Type.GetType("System.Int32"));
Workflow.Columns.Add(TDACount);
DataColumn TriggerType = new DataColumn("TriggerType", System.Type.GetType("System.String"));
Workflow.Columns.Add(TriggerType);
DataColumn CreatedDate = new DataColumn("CreatedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(CreatedDate);
DataColumn CreatedBy = new DataColumn("CreatedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(CreatedBy);
DataColumn LastModifiedDate = new DataColumn("LastModifiedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(LastModifiedDate);
DataColumn LastModifiedBy = new DataColumn("LastModifiedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(LastModifiedBy);
return Workflow;
}
这是我用来将其发送到SQL服务器的代码:
public static void SendDTtoDB(ref DataTable dt, ref SqlConnection cnn, string TableName)
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cnn))
{
bulkCopy.DestinationTableName =
TableName;
try
{
bulkCopy.WriteToServer(dt);
dt.Clear();
}
catch (Exception e)
{
logger.Warn("SendDTtoDB {TableName}: ORGID: {ORGID} : {Message}", TableName, dt.Rows[0]["OrgInfoID"], e.Message.ToString());
if (e.Message.ToString().Contains("PRIMARY KEY"))
{
foreach(DataRow row in dt.Rows)
{
logger.Warn("ID: {id}", row["id"]);
}
}
}
}
}
正如您在catch语句中所看到的,我将其设置为将ID写入日志,以便我自己查看,结果确实有一个重复项。真是让人沮丧!如果不必要,我真的不想去掉Parallel.For并将其单线程化。
根据请求,这是使用Parallel.For的代码
if (qr.totalSize > 0)
{
object lockobject = new object();
Parallel.For(0, qr.records.Length, i =>
{
ToolingService.CustomTab1 vr = new ToolingService.CustomTab1();
vr = (ToolingService.CustomTab1)qr.records[i];
string mdSOQL = "Select FullName, description, ManageableState, MasterLabel, NamespacePrefix, Type, Url, CreatedDate, CreatedBy.Name, "
+ "LastModifiedDate, LastModifiedBy.Name From CustomTab where id='" + vr.Id + "'";
ToolingService.QueryResult mdqr = new ToolingService.QueryResult();
ToolingService.CustomTab1 vrmd = new ToolingService.CustomTab1();
mdqr = ts.query(mdSOQL);
vrmd = (ToolingService.CustomTab1)mdqr.records[0];
DataRow workrow = CustomTabs.NewRow();
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
workrow["OrgInfoID"] = _orgDBID;
workrow["FullName"] = vrmd.FullName;
workrow["Description"] = vrmd.Description ?? Convert.DBNull;
workrow["ManageableState"] = vrmd.ManageableState;
workrow["MasterLabel"] = vrmd.MasterLabel ?? Convert.DBNull;
workrow["NameSpacePrefix"] = vrmd.NamespacePrefix ?? Convert.DBNull;
workrow["Type"] = vrmd.Type ?? Convert.DBNull;
workrow["URL"] = vrmd.Url ?? Convert.DBNull;
workrow["CreatedDate"] = vrmd.CreatedDate ?? Convert.DBNull;
if (vrmd.CreatedBy == null)
{
workrow["CreatedBy"] = Convert.DBNull;
}
else
{
workrow["CreatedBy"] = vrmd.CreatedBy.Name;
}
workrow["LastModifiedDate"] = vrmd.LastModifiedDate ?? Convert.DBNull;
if (vrmd.LastModifiedBy == null)
{
workrow["LastModifiedBy"] = Convert.DBNull;
}
else
{
workrow["LastModifiedBy"] = vrmd.LastModifiedBy.Name;
}
lock (CustomTabs)
{
CustomTabs.Rows.Add(workrow);
}
});
OrgTables.SendDTtoDB(ref CustomTabs, ref _cnn, "OrgCustomTabs");
Parallel.For
,那么十有八九是从一开始就做错了什么。首先,IO负载不适合使用TPL方法。其次,数据库访问本质上是不线程安全的。第三,最好将其作为单个批次查询完成。 - TheGenerallock
之外创建了DataRow
(DataRow workrow = CustomTabs.NewRow();
),然而根据Parallel.ForEach and DataTable - Isn't DataTable.NewRow() a thread safe “read” operation? ,NewRow()
不是线程安全的。因此,你可能并没有实际上创建两个相同的Guid
,而是单个Guid
值被分配给了多行。 - Lance U. MatthewsDataTable
。这是远远不是将数据传递给SqlBulkCopy
最有效的方式。SqlBulkCopy
本身就是一个流接口,因此并行收集数据除非生成数据本身真正成为瓶颈,否则不会带来任何好处。请查看通过采用IDataReader
的重载进行流式传输。不要并行获取源行,而应使用IN
或表值参数。 - Jeroen Mostert