快速向SQL Server插入200万行数据

85

我必须从文本文件中插入大约200万行数据。

并且在插入数据时,我需要创建一些主表。

如何以最佳和最快的方式将这样一大量的数据插入到SQL Server中?

8个回答

76
  1. 我认为最好将文本文件中的数据读取到 DataSet 中

  2. 尝试使用SqlBulkCopy - C# 应用程序中的批量插入到 SQL

// connect to SQL
using (SqlConnection connection = new SqlConnection(connString))
{
    // make sure to enable triggers
    // more on triggers in next post
    SqlBulkCopy bulkCopy = new SqlBulkCopy(
        connection, 
        SqlBulkCopyOptions.TableLock | 
        SqlBulkCopyOptions.FireTriggers | 
        SqlBulkCopyOptions.UseInternalTransaction,
        null
        );

    // set the destination table name
    bulkCopy.DestinationTableName = this.tableName;
    connection.Open();

    // write the data in the "dataTable"
    bulkCopy.WriteToServer(dataTable);
    connection.Close();
}
// reset
this.dataTable.Clear();

或者

在顶部执行步骤1后

  1. 从DataSet创建XML
  2. 将XML传递给数据库并执行批量插入

您可以查看此文章获取详细信息:使用C# DataTable和SQL Server OpenXML函数进行数据批量插入

但它尚未测试2百万条记录,虽然可以执行,但会占用机器内存,因为您需要加载2百万条记录并将其插入。


1
我知道现在可能有点晚了,但如果有足够的列(25个或更多),对于约200万行(或更多),当填充数据集/数据表时,代码几乎不可避免地会在某个时刻生成“OutOfMemoryException”异常。 - Razort4x
19
你可以设置缓冲区来避免内存异常。对于文本文件,我使用了File.ReadLines(file).Skip(X).Take(100000).ToList()。在每100k后,我会重置并继续移动到下一个100k。效果很好且速度非常快。 - Jason Foglia
如果源数据来自 Sql Server 表怎么办?假设该表有 3000 万行,我们仍然可以使用 bulkcopy 吗?一个简单的 Insert into table1 Select * from table2 不会更快吗? - Madhav Shenoy

63
您可以尝试使用 SqlBulkCopy 类,它可以帮助您高效地将来自其他源的数据批量加载到 SQL Server 表中。
有一篇很棒的博客文章介绍了如何使用它。

1
请注意,正如SQLBulCopy链接中所提到的那样,如果源表和目标表在同一个SQL Server实例中,则使用Transact-SQL INSERT ... SELECT语句复制数据更容易且更快。 - nam

33

关于SqlBulkCopy的解决方案:

我使用StreamReader将文本文件转换并处理,结果是我的对象列表。

我创建了一个类,它接受Datatable或List<T>和缓冲区大小(CommitBatchSize)。它将使用扩展(在第二个类中)将列表转换为数据表。

它非常快速。在我的电脑上,我能够在不到10秒的时间内插入超过1000万条复杂的记录。

这是该类:

using System;
using System.Collections;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DAL
{

public class BulkUploadToSql<T>
{
    public IList<T> InternalStore { get; set; }
    public string TableName { get; set; }
    public int CommitBatchSize { get; set; }=1000;
    public string ConnectionString { get; set; }

    public void Commit()
    {
        if (InternalStore.Count>0)
        {
            DataTable dt;
            int numberOfPages = (InternalStore.Count / CommitBatchSize)  + (InternalStore.Count % CommitBatchSize == 0 ? 0 : 1);
            for (int pageIndex = 0; pageIndex < numberOfPages; pageIndex++)
                {
                    dt= InternalStore.Skip(pageIndex * CommitBatchSize).Take(CommitBatchSize).ToDataTable();
                BulkInsert(dt);
                }
        } 
    }

    public void BulkInsert(DataTable dt)
    {
        using (SqlConnection connection = new SqlConnection(ConnectionString))
        {
            // make sure to enable triggers
            // more on triggers in next post
            SqlBulkCopy bulkCopy =
                new SqlBulkCopy
                (
                connection,
                SqlBulkCopyOptions.TableLock |
                SqlBulkCopyOptions.FireTriggers |
                SqlBulkCopyOptions.UseInternalTransaction,
                null
                );

            // set the destination table name
            bulkCopy.DestinationTableName = TableName;
            connection.Open();

            // write the data in the "dataTable"
            bulkCopy.WriteToServer(dt);
            connection.Close();
        }
        // reset
        //this.dataTable.Clear();
    }

}

public static class BulkUploadToSqlHelper
{
    public static DataTable ToDataTable<T>(this IEnumerable<T> data)
    {
        PropertyDescriptorCollection properties =
            TypeDescriptor.GetProperties(typeof(T));
        DataTable table = new DataTable();
        foreach (PropertyDescriptor prop in properties)
            table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
        foreach (T item in data)
        {
            DataRow row = table.NewRow();
            foreach (PropertyDescriptor prop in properties)
                row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
            table.Rows.Add(row);
        }
        return table;
    }
}

以下是一个示例,当我想要插入我的自定义对象列表List<PuckDetection> (ListDetections) 时:

var objBulk = new BulkUploadToSql<PuckDetection>()
{
        InternalStore = ListDetections,
        TableName= "PuckDetections",
        CommitBatchSize=1000,
        ConnectionString="ENTER YOU CONNECTION STRING"
};
objBulk.Commit();
< p > BulkInsert 类可以根据需要修改以添加列映射。例如,如果您的第一列是标识键(假设数据表中的列名与数据库相同),则可以进行如下操作。

//ADD COLUMN MAPPING
foreach (DataColumn col in dt.Columns)
{
        bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName);
}

当我尝试使用你的代码时,出现错误:IEnumerable<T>不包含ToDataTable的定义。 - eug100

6
我使用bcp工具(批量复制程序)每个月加载大约1.5百万个文本记录。每个文本记录有800个字符宽度。在我的服务器上,将这1.5百万个文本记录添加到SQL Server表中需要约30秒钟。
bcp的说明在http://msdn.microsoft.com/en-us/library/ms162802.aspx。请参考。

我也会推荐使用bcp实用程序。我不知道有什么更快的了。 - Morzel

4

我尝试了这种方法,明显缩短了我的数据库插入执行时间。

List<string> toinsert = new List<string>();
StringBuilder insertCmd = new StringBuilder("INSERT INTO tabblename (col1, col2, col3) VALUES ");

foreach (var row in rows)
{
      // the point here is to keep values quoted and avoid SQL injection
      var first = row.First.Replace("'", "''")
      var second = row.Second.Replace("'", "''")
      var third = row.Third.Replace("'", "''")

      toinsert.Add(string.Format("( '{0}', '{1}', '{2}' )", first, second, third));
}
if (toinsert.Count != 0)
{
      insertCmd.Append(string.Join(",", toinsert));
      insertCmd.Append(";");
}
using (MySqlCommand myCmd = new MySqlCommand(insertCmd.ToString(), SQLconnectionObject))
{
      myCmd.CommandType = CommandType.Text;
      myCmd.ExecuteNonQuery();
}

创建SQL连接对象并将其替换为我编写的SQLconnectionObject。


4
小心!这可以被利用进行SQL注入攻击。 - Andre Soares
因为没有避免 SQL 注入攻击而被踩了一脚(请参见 https://dev59.com/M2Up5IYBdhLWcg3wEEbd )。 - Andrew Hill

3
我最近遇到这种情况(超过700万行),我使用了Powershell中的sqlcmd(将原始数据解析为SQL插入语句)每次处理5000条记录(SQL无法处理700万行或甚至500,000行的数据,除非将其分解成小的5K块。您可以依次运行每个5K脚本。)因为我需要利用 SQL Server 2012 Enterprise 中的新序列命令。我找不到一种程序化的方法来快速高效地插入700万行数据并使用所述序列命令。
其次,在一次性插入100万行或更多数据时要注意CPU和内存消耗(主要是内存)在插入过程中会被SQL吃掉,而且不会释放所需处理器资源和内存。不用说,如果您的服务器没有足够的处理能力或内存,那么它很容易在短时间内崩溃(我通过艰难的方式发现了这一点)。如果您的内存消耗超过70-75%,请重新启动服务器,进程将恢复正常。
在我能够制定最终执行计划之前,我不得不进行大量的试错测试以查看我的服务器的限制(给定有限的处理器/内存资源)。我建议您在测试环境中做同样的事情,然后再将其推广到生产环境中。

1
7百万行需要多长时间?我还要插入约3千万行。现在我正在通过一个存储过程和一个DataTable来推送它们。 - Alexandre Brisebois
1
同时运行小批处理大约花了五到六个小时。请记住,我只是使用了 SQL 2012 中的新 SEQUENCE 命令,并且找不到如何在 T-SQL 之外自动化此过程的信息,因此只使用了直接的 T-SQL 插入命令。 - Techie Joe

2

我遇到了一个与ADO、Entity和Dapper兼容的解决方案问题,所以我制作了这个库;它会生成以下形式的批处理:

    IEnumerable<(string SqlQuery, IEnumerable<SqlParameter> SqlParameters)>  
    IEnumerable<(string SqlQuery, DynamicParameters DapperDynamicParameters)> 

这个链接包含说明。它使用参数而非连接符号,因此可以安全地防止SQL注入攻击;如果需要的话,您还可以通过可选参数将identity insert设置为ON。

在ADO.NET中的使用方法:

using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
    .SetTableName("People")
    .AddMapping(person => person.FirstName, columnName: "Name")
    .AddMapping(person => person.LastName, columnName: "Surename")
    .AddMapping(person => person.DateOfBirth, columnName: "Birthday")
    .Build();
var people = new List<Person>()
{ 
    new Person()
    {
        FirstName = "John", 
        LastName = "Lennon", 
        DateOfBirth = new DateTime(1940, 10, 9) 
    },
    new Person()
    {
        FirstName = "Paul", 
        LastName = "McCartney", 
        DateOfBirth = new DateTime(1942, 6, 18) 
    },
};
var connectionString = "Server=SERVER_ADDRESS;Database=DATABASE_NAME;User Id=USERNAME;Password=PASSWORD;";
var sqlQueriesAndParameters = new MsSqlQueryGenerator()
    .GenerateParametrizedBulkInserts(mapper, people);

using (var sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    
    // Default batch size: 1000 rows or (2100-1) parameters per insert.
    foreach (var (SqlQuery, SqlParameters) in sqlQueriesAndParameters)
    {
        using (SqlCommand sqlCommand = new SqlCommand(SqlQuery, sqlConnection))
        {
            sqlCommand.Parameters.AddRange(SqlParameters.ToArray());
            sqlCommand.ExecuteNonQuery();
        }
    }
}

Dapper的使用:

using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
    .SetTableName("People")
    .AddMapping(person => person.FirstName, columnName: "Name")
    .AddMapping(person => person.LastName, columnName: "Surename")
    .AddMapping(person => person.DateOfBirth, columnName: "Birthday")
    .Build();
var people = new List<Person>()
{ 
    new Person()
    {
        FirstName = "John", 
        LastName = "Lennon", 
        DateOfBirth = new DateTime(1940, 10, 9) 
    },
    new Person()
    { 
        FirstName = "Paul", 
        LastName = "McCartney", 
        DateOfBirth = new DateTime(1942, 6, 18) 
    },
};
var connectionString = "Server=SERVER_ADDRESS;Database=DATABASE_NAME;User Id=USERNAME;Password=PASSWORD;";
var sqlQueriesAndDapperParameters = new MsSqlQueryGenerator()
    .GenerateDapperParametrizedBulkInserts(mapper, people);

using (var sqlConnection = new SqlConnection(connectionString))
{
    // Default batch size: 1000 rows or (2100-1) parameters per insert.
    foreach (var (SqlQuery, DapperDynamicParameters) in sqlQueriesAndDapperParameters)
    {
        sqlConnection.Execute(SqlQuery, DapperDynamicParameters);
    }
}

与Entity Framework一起使用:

using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
    .SetTableName("People")
    .AddMapping(person => person.FirstName, columnName: "Name")
    .AddMapping(person => person.LastName, columnName: "Surename")
    .AddMapping(person => person.DateOfBirth, columnName: "Birthday")
    .Build();
var people = new List<Person>()
{ 
    new Person() 
    { 
        FirstName = "John", 
        LastName = "Lennon", 
        DateOfBirth = new DateTime(1940, 10, 9) 
    },
    new Person()
    { 
        FirstName = "Paul", 
        LastName = "McCartney", 
        DateOfBirth = new DateTime(1942, 6, 18) 
    },
};
var sqlQueriesAndParameters = new MsSqlQueryGenerator()
    .GenerateParametrizedBulkInserts(mapper, people);

// Default batch size: 1000 rows or (2100-1) parameters per insert.
foreach (var (SqlQuery, SqlParameters) in sqlQueriesAndParameters)
{
    _context.Database.ExecuteSqlRaw(SqlQuery, SqlParameters);
    // Depracated but still works: _context.Database.ExecuteSqlCommand(SqlQuery, SqlParameters);
}

0

另一种方法是,如果文本数据以Json格式存在,则可以在SQL Server中使用OPENJSON方法进行批量插入。

我在我的本地机器上进行了测试,并能够在51秒内插入100万条记录。

这是存储过程:

CREATE PROCEDURE sp_upsert_play_user_details1  
(  
@array VARCHAR(MAX)  
)  
AS  
BEGIN  
BEGIN TRY  
BEGIN TRANSACTION  
  
 INSERT INTO tbl_play_user_details    
 (vc_first_name, vc_last_name, vc_full_name, vc_gender, vc_phone_number, vc_email, vc_pet, vc_vehicle_model, vc_vehicle_no, int_created_on, int_created_by)    

 SELECT firstName, lastName, fullName, gender, phoneNumber, email, pet, vehicle, vehicleNumber, GETDATE(), createdBy  FROM OPENJSON(@array)  
        WITH (  firstName VARCHAR(100),  
    lastName VARCHAR(100),  
    fullName VARCHAR(100),  
    gender VARCHAR(100),  
    phoneNumber VARCHAR(100),  
    email VARCHAR(100),  
    pet VARCHAR(100),  
    vehicle VARCHAR(100),  
    vehicleNumber VARCHAR(100),  
                createdBy int);  
  
COMMIT TRANSACTION  
END TRY  
BEGIN CATCH  
ROLLBACK TRANSACTION                            
DECLARE @ErrorMessage NVARCHAR(4000)=ERROR_MESSAGE()+' Please verify "'+ERROR_PROCEDURE()+'" stored procedure at the line number '+CONVERT(NVARCHAR(20),ERROR_LINE() )+ '.';                                      
DECLARE @ErrorSeverity INT = ERROR_SEVERITY();                                      
DECLARE @ErrorState INT=ERROR_STATE();                                      
RAISERROR (@ErrorMessage,@ErrorSeverity,@ErrorState)   
END CATCH  
END  
GO

用于测试的示例JSON:

DECLARE @array VARCHAR(MAX);  
SET @array = '[{  
        "firstName": "Winston",  
        "lastName": "Lemke",  
        "fullName": "Winston Lemke",  
        "gender": "Male",  
        "phoneNumber": "466.780.4652 x268",  
        "email": "Josefa89@yahoo.com",  
        "pet": "Villanuco de Las Encartaciones",  
        "vehicle": "Mazda Escalade",  
        "vehicleNumber": "8CP7UC1N83MY25770",  
        "createdBy": 1  
    },  
 {  
        "firstName": "Finn",  
        "lastName": "Bartoletti",  
        "fullName": "Finn Bartoletti",  
        "gender": "Female",  
        "phoneNumber": "1-931-498-0214 x454",  
        "email": "Clair.Rodriguez@hotmail.com",  
        "pet": "Bouvier des Flandres",  
        "vehicle": "Tesla Ranchero",  
        "vehicleNumber": "MG1XVY29D0M798471",  
        "createdBy": 1  
    }]';  

EXEC sp_upsert_play_user_details1 @array;

在C#中,我解析了本地文件中的数据,并将字符串传递给存储过程:
string text = System.IO.File.ReadAllText(@"C:\Users\krish\OneDrive\Desktop\object.json");

_playdb.CommandTimeout = 3000;
_playdb.sp_upsert_play_user_details1(text);


如上所述,这只需要51秒来插入一百万条记录,在更快的服务器/工作机器上可能会更快。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接