我必须从文本文件中插入大约200万行数据。
并且在插入数据时,我需要创建一些主表。
如何以最佳和最快的方式将这样一大量的数据插入到SQL Server中?
我必须从文本文件中插入大约200万行数据。
并且在插入数据时,我需要创建一些主表。
如何以最佳和最快的方式将这样一大量的数据插入到SQL Server中?
我认为最好将文本文件中的数据读取到 DataSet 中
尝试使用SqlBulkCopy - C# 应用程序中的批量插入到 SQL
// connect to SQL
using (SqlConnection connection = new SqlConnection(connString))
{
// make sure to enable triggers
// more on triggers in next post
SqlBulkCopy bulkCopy = new SqlBulkCopy(
connection,
SqlBulkCopyOptions.TableLock |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.UseInternalTransaction,
null
);
// set the destination table name
bulkCopy.DestinationTableName = this.tableName;
connection.Open();
// write the data in the "dataTable"
bulkCopy.WriteToServer(dataTable);
connection.Close();
}
// reset
this.dataTable.Clear();
或者
在顶部执行步骤1后
您可以查看此文章获取详细信息:使用C# DataTable和SQL Server OpenXML函数进行数据批量插入
但它尚未测试2百万条记录,虽然可以执行,但会占用机器内存,因为您需要加载2百万条记录并将其插入。
关于SqlBulkCopy的解决方案:
我使用StreamReader将文本文件转换并处理,结果是我的对象列表。
我创建了一个类,它接受Datatable或List<T>和缓冲区大小(CommitBatchSize)。它将使用扩展(在第二个类中)将列表转换为数据表。
它非常快速。在我的电脑上,我能够在不到10秒的时间内插入超过1000万条复杂的记录。
这是该类:
using System;
using System.Collections;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace DAL
{
public class BulkUploadToSql<T>
{
public IList<T> InternalStore { get; set; }
public string TableName { get; set; }
public int CommitBatchSize { get; set; }=1000;
public string ConnectionString { get; set; }
public void Commit()
{
if (InternalStore.Count>0)
{
DataTable dt;
int numberOfPages = (InternalStore.Count / CommitBatchSize) + (InternalStore.Count % CommitBatchSize == 0 ? 0 : 1);
for (int pageIndex = 0; pageIndex < numberOfPages; pageIndex++)
{
dt= InternalStore.Skip(pageIndex * CommitBatchSize).Take(CommitBatchSize).ToDataTable();
BulkInsert(dt);
}
}
}
public void BulkInsert(DataTable dt)
{
using (SqlConnection connection = new SqlConnection(ConnectionString))
{
// make sure to enable triggers
// more on triggers in next post
SqlBulkCopy bulkCopy =
new SqlBulkCopy
(
connection,
SqlBulkCopyOptions.TableLock |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.UseInternalTransaction,
null
);
// set the destination table name
bulkCopy.DestinationTableName = TableName;
connection.Open();
// write the data in the "dataTable"
bulkCopy.WriteToServer(dt);
connection.Close();
}
// reset
//this.dataTable.Clear();
}
}
public static class BulkUploadToSqlHelper
{
public static DataTable ToDataTable<T>(this IEnumerable<T> data)
{
PropertyDescriptorCollection properties =
TypeDescriptor.GetProperties(typeof(T));
DataTable table = new DataTable();
foreach (PropertyDescriptor prop in properties)
table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
foreach (T item in data)
{
DataRow row = table.NewRow();
foreach (PropertyDescriptor prop in properties)
row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
table.Rows.Add(row);
}
return table;
}
}
以下是一个示例,当我想要插入我的自定义对象列表List<PuckDetection>
(ListDetections
) 时:
var objBulk = new BulkUploadToSql<PuckDetection>()
{
InternalStore = ListDetections,
TableName= "PuckDetections",
CommitBatchSize=1000,
ConnectionString="ENTER YOU CONNECTION STRING"
};
objBulk.Commit();
< p > BulkInsert
类可以根据需要修改以添加列映射。例如,如果您的第一列是标识键(假设数据表中的列名与数据库相同),则可以进行如下操作。
//ADD COLUMN MAPPING
foreach (DataColumn col in dt.Columns)
{
bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName);
}
我尝试了这种方法,明显缩短了我的数据库插入执行时间。
List<string> toinsert = new List<string>();
StringBuilder insertCmd = new StringBuilder("INSERT INTO tabblename (col1, col2, col3) VALUES ");
foreach (var row in rows)
{
// the point here is to keep values quoted and avoid SQL injection
var first = row.First.Replace("'", "''")
var second = row.Second.Replace("'", "''")
var third = row.Third.Replace("'", "''")
toinsert.Add(string.Format("( '{0}', '{1}', '{2}' )", first, second, third));
}
if (toinsert.Count != 0)
{
insertCmd.Append(string.Join(",", toinsert));
insertCmd.Append(";");
}
using (MySqlCommand myCmd = new MySqlCommand(insertCmd.ToString(), SQLconnectionObject))
{
myCmd.CommandType = CommandType.Text;
myCmd.ExecuteNonQuery();
}
创建SQL连接对象并将其替换为我编写的SQLconnectionObject。
我遇到了一个与ADO、Entity和Dapper兼容的解决方案问题,所以我制作了这个库;它会生成以下形式的批处理:
IEnumerable<(string SqlQuery, IEnumerable<SqlParameter> SqlParameters)>
IEnumerable<(string SqlQuery, DynamicParameters DapperDynamicParameters)>
这个链接包含说明。它使用参数而非连接符号,因此可以安全地防止SQL注入攻击;如果需要的话,您还可以通过可选参数将identity insert设置为ON。
在ADO.NET中的使用方法:
using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
.SetTableName("People")
.AddMapping(person => person.FirstName, columnName: "Name")
.AddMapping(person => person.LastName, columnName: "Surename")
.AddMapping(person => person.DateOfBirth, columnName: "Birthday")
.Build();
var people = new List<Person>()
{
new Person()
{
FirstName = "John",
LastName = "Lennon",
DateOfBirth = new DateTime(1940, 10, 9)
},
new Person()
{
FirstName = "Paul",
LastName = "McCartney",
DateOfBirth = new DateTime(1942, 6, 18)
},
};
var connectionString = "Server=SERVER_ADDRESS;Database=DATABASE_NAME;User Id=USERNAME;Password=PASSWORD;";
var sqlQueriesAndParameters = new MsSqlQueryGenerator()
.GenerateParametrizedBulkInserts(mapper, people);
using (var sqlConnection = new SqlConnection(connectionString))
{
sqlConnection.Open();
// Default batch size: 1000 rows or (2100-1) parameters per insert.
foreach (var (SqlQuery, SqlParameters) in sqlQueriesAndParameters)
{
using (SqlCommand sqlCommand = new SqlCommand(SqlQuery, sqlConnection))
{
sqlCommand.Parameters.AddRange(SqlParameters.ToArray());
sqlCommand.ExecuteNonQuery();
}
}
}
Dapper的使用:
using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
.SetTableName("People")
.AddMapping(person => person.FirstName, columnName: "Name")
.AddMapping(person => person.LastName, columnName: "Surename")
.AddMapping(person => person.DateOfBirth, columnName: "Birthday")
.Build();
var people = new List<Person>()
{
new Person()
{
FirstName = "John",
LastName = "Lennon",
DateOfBirth = new DateTime(1940, 10, 9)
},
new Person()
{
FirstName = "Paul",
LastName = "McCartney",
DateOfBirth = new DateTime(1942, 6, 18)
},
};
var connectionString = "Server=SERVER_ADDRESS;Database=DATABASE_NAME;User Id=USERNAME;Password=PASSWORD;";
var sqlQueriesAndDapperParameters = new MsSqlQueryGenerator()
.GenerateDapperParametrizedBulkInserts(mapper, people);
using (var sqlConnection = new SqlConnection(connectionString))
{
// Default batch size: 1000 rows or (2100-1) parameters per insert.
foreach (var (SqlQuery, DapperDynamicParameters) in sqlQueriesAndDapperParameters)
{
sqlConnection.Execute(SqlQuery, DapperDynamicParameters);
}
}
与Entity Framework一起使用:
using MsSqlHelpers;
// ...
var mapper = new MapperBuilder<Person>()
.SetTableName("People")
.AddMapping(person => person.FirstName, columnName: "Name")
.AddMapping(person => person.LastName, columnName: "Surename")
.AddMapping(person => person.DateOfBirth, columnName: "Birthday")
.Build();
var people = new List<Person>()
{
new Person()
{
FirstName = "John",
LastName = "Lennon",
DateOfBirth = new DateTime(1940, 10, 9)
},
new Person()
{
FirstName = "Paul",
LastName = "McCartney",
DateOfBirth = new DateTime(1942, 6, 18)
},
};
var sqlQueriesAndParameters = new MsSqlQueryGenerator()
.GenerateParametrizedBulkInserts(mapper, people);
// Default batch size: 1000 rows or (2100-1) parameters per insert.
foreach (var (SqlQuery, SqlParameters) in sqlQueriesAndParameters)
{
_context.Database.ExecuteSqlRaw(SqlQuery, SqlParameters);
// Depracated but still works: _context.Database.ExecuteSqlCommand(SqlQuery, SqlParameters);
}
另一种方法是,如果文本数据以Json格式存在,则可以在SQL Server中使用OPENJSON方法进行批量插入。
我在我的本地机器上进行了测试,并能够在51秒内插入100万条记录。
这是存储过程:
CREATE PROCEDURE sp_upsert_play_user_details1
(
@array VARCHAR(MAX)
)
AS
BEGIN
BEGIN TRY
BEGIN TRANSACTION
INSERT INTO tbl_play_user_details
(vc_first_name, vc_last_name, vc_full_name, vc_gender, vc_phone_number, vc_email, vc_pet, vc_vehicle_model, vc_vehicle_no, int_created_on, int_created_by)
SELECT firstName, lastName, fullName, gender, phoneNumber, email, pet, vehicle, vehicleNumber, GETDATE(), createdBy FROM OPENJSON(@array)
WITH ( firstName VARCHAR(100),
lastName VARCHAR(100),
fullName VARCHAR(100),
gender VARCHAR(100),
phoneNumber VARCHAR(100),
email VARCHAR(100),
pet VARCHAR(100),
vehicle VARCHAR(100),
vehicleNumber VARCHAR(100),
createdBy int);
COMMIT TRANSACTION
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
DECLARE @ErrorMessage NVARCHAR(4000)=ERROR_MESSAGE()+' Please verify "'+ERROR_PROCEDURE()+'" stored procedure at the line number '+CONVERT(NVARCHAR(20),ERROR_LINE() )+ '.';
DECLARE @ErrorSeverity INT = ERROR_SEVERITY();
DECLARE @ErrorState INT=ERROR_STATE();
RAISERROR (@ErrorMessage,@ErrorSeverity,@ErrorState)
END CATCH
END
GO
用于测试的示例JSON:
DECLARE @array VARCHAR(MAX);
SET @array = '[{
"firstName": "Winston",
"lastName": "Lemke",
"fullName": "Winston Lemke",
"gender": "Male",
"phoneNumber": "466.780.4652 x268",
"email": "Josefa89@yahoo.com",
"pet": "Villanuco de Las Encartaciones",
"vehicle": "Mazda Escalade",
"vehicleNumber": "8CP7UC1N83MY25770",
"createdBy": 1
},
{
"firstName": "Finn",
"lastName": "Bartoletti",
"fullName": "Finn Bartoletti",
"gender": "Female",
"phoneNumber": "1-931-498-0214 x454",
"email": "Clair.Rodriguez@hotmail.com",
"pet": "Bouvier des Flandres",
"vehicle": "Tesla Ranchero",
"vehicleNumber": "MG1XVY29D0M798471",
"createdBy": 1
}]';
EXEC sp_upsert_play_user_details1 @array;
string text = System.IO.File.ReadAllText(@"C:\Users\krish\OneDrive\Desktop\object.json");
_playdb.CommandTimeout = 3000;
_playdb.sp_upsert_play_user_details1(text);
Insert into table1 Select * from table2
不会更快吗? - Madhav Shenoy