C#数据库中的多个并行插入

6

我有一个包含大约3000行的数据表。每一行都需要插入到数据库表中。目前,我正在运行以下foreach循环:

obj_AseCommand.CommandText = sql_proc;
obj_AseCommand.CommandType = CommandType.StoredProcedure;
obj_AseCommand.Connection = db_Conn;
obj_AseCommand.Connection.Open();

foreach (DataRow dr in dt.Rows)                
{
    obj_AseCommand.Parameters.AddWithValue("@a", dr["a"]);
    obj_AseCommand.Parameters.AddWithValue("@b", dr["b"]);
    obj_AseCommand.Parameters.AddWithValue("@c", dr["c"]);

    obj_AseCommand.ExecuteNonQuery();
    obj_AseCommand.Parameters.Clear();
}

obj_AseCommand.Connection.Close();

请问如何在数据库中并行执行存储过程呢?因为上述方法需要大约10分钟才能插入3000行数据。


2
只是出于好奇 - 你为什么要使用存储过程向数据库添加3000行?如果这是来自某个输入文件,为什么不使用一些管理工具将其直接导入数据库呢? - ShayD
数据表从其他数据库 - 主数据库中填充。我的目标是从主数据库获取数据并将其插入到我的数据库中。存储过程用于在我的数据库中插入数据。 - Harsh
4
为什么不利用批量插入或链接服务器,然后让SQL提取并插入数据。 - 3dd
1
Harsh - 我认为最好使用批量插入/导入/数据库管理系统所称的任何方法来完成此操作。这将节省您很多麻烦和时间。 - ShayD
4个回答

13

编辑

事后看来,使用 Parallel.ForEach 并行插入数据库略显浪费,因为它还会为每个连接消耗一个线程。可以说,更好的并行解决方案是使用 System.Data 数据库操作的异步版本,例如 ExecuteNonQueryAsync,启动执行(并发),然后使用 await Task.WhenAll() 等待完成 - 这将避免对调用方的线程开销,尽管整体数据库性能可能不会更快。更多信息

原始答案,将多个并行插入到数据库中

你可以使用TPL并行完成此操作,例如使用Parallel.ForEachlocalInit重载。你几乎肯定需要调整MaxDegreeOfParalelism以限制并行度,以避免数据库被淹没。
Parallel.ForEach(dt.Rows,
    // Adjust this for optimum throughput vs minimal impact to your other DB users
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    () =>
    {
        var con = new SqlConnection();
        var cmd = con.CreateCommand();
        cmd.CommandText = sql_proc;
        cmd.CommandType = CommandType.StoredProcedure;
        con.Open();

        cmd.Parameters.Add(new SqlParameter("@a", SqlDbType.Int));
        // NB : Size sensitive parameters must have size
        cmd.Parameters.Add(new SqlParameter("@b", SqlDbType.VarChar, 100));
        cmd.Parameters.Add(new SqlParameter("@c", SqlDbType.Bit));
        // Prepare won't help with SPROCs but can improve plan caching for adhoc sql
        // cmd.Prepare();
        return new {Conn = con, Cmd = cmd};
    },
    (dr, pls, localInit) =>
    {
        localInit.Cmd.Parameters["@a"] = dr["a"];
        localInit.Cmd.Parameters["@b"] = dr["b"];
        localInit.Cmd.Parameters["@c"] = dr["c"];
        localInit.Cmd.ExecuteNonQuery();
        return localInit;
    },
    (localInit) =>
    {
        localInit.Cmd.Dispose();
        localInit.Conn.Dispose();
    });

注:

  • 除非您真的知道自己在做什么,一般情况下我们应该让TPL来决定并行度。但是,根据资源争用的程度(即针对数据库工作的锁定),可能需要限制并发任务的上限(试验和错误可能是有用的,例如尝试使用4、8、16个并发任务等来确定哪个能够实现最高吞吐量,并监视Sql Server上的锁定和CPU负载)。
  • 同样,保留TPL的默认分区器通常足以将DataRows分配到各个任务中。
  • 每个任务都需要自己独立的Sql连接。
  • 与其在每次调用时创建和处理命令,不如为每个任务创建一次并重复使用相同的Command,只需每次更新参数即可。
  • 使用LocalInit/LocalFinally lambda来进行每个任务的设置和清理工作,例如释放命令和连接。
  • 如果您使用的是AdHoc Sql或2005年之前的Sql版本,也可以考虑使用.Prepare()
  • 我假设枚举DataTable的行是线程安全的。当然,您需要进行双重检查。

旁注:

即使有广泛的表格和单个线程,10分钟处理3000行也是过度的。您的存储过程是做什么的?我假设处理不是微不足道的,因此需要SPROC,但如果您只是进行简单的插入操作,如@3dd的评论所述,SqlBulkCopy将在相对较窄的表格上产生每分钟约1M行的插入。


嗨,Stuart,感谢您的建议。我尝试使用以下代码,并使用AseBulkCopy,因为我们有Sybase数据库: AseBulkCopy obj_AseBulkCopy = new AseBulkCopy(db_Conn); obj_AseBulkCopy.DestinationTableName = "db_table"; obj_AseBulkCopy.BatchSize = 1000; db_Conn.Open(); obj_AseBulkCopy.WriteToServer(dt); db_Conn.Close(); 然而,执行时间仍然相同。我在这段代码中漏掉了什么吗? - Harsh
如果即使使用BulkCopy导入3000行数据也需要10分钟,那么您的RDBMS出现了非常奇怪的问题。我猜测您在插入表上有触发器执行了大量逻辑,或者存在大量锁争用,或者可能有很多约束、规则、索引,而且表格非常宽。您需要DBA认真查看表格——插入技术和缺乏并行性不是瓶颈所在 :( - StuartLC

5

最好将整个数据表传入数据库。

obj_AseCommand.CommandText = sql_proc;
obj_AseCommand.CommandType = CommandType.StoredProcedure;
obj_AseCommand.Connection = db_Conn;
obj_AseCommand.Connection.Open();
obj_AseCommand.Parameters.AddWithValue("@Parametername",DataTable);
obj_AseCommand.ExecuteNonQuery();

在数据库中,您需要创建与数据表完全匹配的表类型。

CREATE TYPE EmpType AS TABLE 
(
    ID INT, Name VARCHAR(3000), Address VARCHAR(8000), Operation SMALLINT //your columns
)

在存储过程中,您可以像这样进行操作...
create PROCEDURE demo

@Details EmpType READONLY // it must be read only
AS
BEGIN
    insert into yourtable   //insert data
    select * from @Details 
    END

从 SQL Server 2008 开始,这是可以实现的。 - Nikhil Vartak

3
你可以使用 SqlBulkCopy。参见下面的示例代码。在提供相同映射的情况下,WriteToServer 方法将datatable 写入数据库中。
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(ConSQL)) {
if (ConSQL.State == ConnectionState.Closed) {
    ConSQL.Open();
}

bulkCopy.ColumnMappings.Add(0, 0);
bulkCopy.ColumnMappings.Add(1, 1);
bulkCopy.ColumnMappings.Add(2, 2);

bulkCopy.DestinationTableName = "dbo.TableName";

bulkCopy.WriteToServer(dataTable);

bulkCopy.Close(); //redundant - since using will dispose the object

}

0

您可以使用 SqlBulkCopy

指南在 这里


2
这更像是一条评论,而不是答案。 - 3dd
好的,抱歉,我是一个新用户 :) - Mehmet Otkun
3
这是一个合理的答案(虽然可能不是很好),通过补充一些为什么和如何的解释可以改进它。 - Eren Ersönmez

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接