如何在最短时间内插入1000万条记录?

34

我有一个文件(拥有1000万条记录)如下:

    line1
    line2
    line3
    line4
   .......
    ......
    10 million lines

基本上我想要将1000万条记录插入数据库中。所以我读取文件并将其上传到SQL Server。

C# 代码

System.IO.StreamReader file = 
    new System.IO.StreamReader(@"c:\test.txt");
while((line = file.ReadLine()) != null)
{
    // insertion code goes here
    //DAL.ExecuteSql("insert into table1 values("+line+")");
}

file.Close();

插入操作将会花费很长的时间。如何使用C#以最短时间插入1000万条记录?

更新1:
批量插入:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(

     ROWTERMINATOR =' \n'
  );

我的表格如下:

DATAs
(
     DatasField VARCHAR(MAX)
)

但是我遇到了以下错误:
Msg 4866,级别16,状态1,第1行
批量加载失败。数据文件中的第1行第1列列过长。请验证字段终止符和行终止符是否正确指定。
Msg 7399,级别16,状态1,第1行
连接服务器“(null)”的OLE DB提供程序“BULK”报告了一个错误。该提供程序没有提供有关错误的任何信息。
Msg 7330,级别16,状态2,第1行
无法从连接服务器“(null)”的OLE DB提供程序“BULK”获取一行。
下面的代码有效:
BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(
    FIELDTERMINATOR = '\t',
    ROWTERMINATOR = '\n'
);

3
你可以使用SQL Server导入工具直接导入吗?为什么要用C#做这个? - gjvdkamp
32
看一下BULK INSERT、BCP和SqlBulkCopy等工具。最糟糕的做法是在循环中逐个插入1000万行数据。请注意不要改变原意,使语言更加通俗易懂。 - Aaron Bertrand
1
你尝试过使用 BULK INSERT 吗?- http://msdn.microsoft.com/zh-cn/library/ms188365.aspx? - Kami
使用XML,我可以在不打开BULK INSERT的情况下完成它。 - MethodMan
2
@AaronBertrand 从 GUI 中一次只能处理一个比 1 还糟糕吗? :) - Daniel E.
显示剩余4条评论
4个回答

51
请勿创建DataTable以通过BulkCopy进行加载。这对于较小的数据集是可以接受的解决方案,但在调用数据库之前将所有1000万行加载到内存中绝对没有必要。
除了BCP/BULK INSERT/OPENROWSET(BULK...)之外,最好的选择是通过表值参数(TVP)将文件内容流式传输到数据库中。通过使用TVP,您可以打开文件,读取行并发送行,直到完成,然后关闭文件。此方法的内存占用量仅为一行。我写了一篇文章从应用程序将数据流式传输到SQL Server 2008,其中有一个非常相似的示例。
结构的简单概述如下。我假设与上面显示的问题相同的导入表和字段名称。
所需的数据库对象:
-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO

-- Second: Use the UDTT as an input param to an import proc.
--         Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;

INSERT INTO dbo.DATAs (DatasField)
    SELECT  Field
    FROM    @ImportTable;

GO

以下是使用C#应用程序代码来利用上述SQL对象的方法。注意,与其填充一个对象(如DataTable),然后执行存储过程不同,在这种方法中,执行存储过程会启动读取文件内容的过程。存储过程的输入参数不是一个变量;它是一个方法GetFileContents的返回值。当SqlCommand调用ExecuteNonQuery时,将调用该方法,打开文件,读取一行,并通过IEnumerable<SqlDataRecord>yield return结构将该行发送到SQL Server,然后关闭文件。存储过程只看到一个表变量@ImportTable,可以在数据开始传输时立即访问(注意:即使不是全部内容,数据也会在tempdb中短暂存储)。
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private static IEnumerable<SqlDataRecord> GetFileContents()
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
   };
   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
   StreamReader _FileReader = null;

   try
   {
      _FileReader = new StreamReader("{filePath}");

      // read a row, send a row
      while (!_FileReader.EndOfStream)
      {
         // You shouldn't need to call "_DataRecord = new SqlDataRecord" as
         // SQL Server already received the row when "yield return" was called.
         // Unlike BCP and BULK INSERT, you have the option here to create a string
         // call ReadLine() into the string, do manipulation(s) / validation(s) on
         // the string, then pass that string into SetString() or discard if invalid.
         _DataRecord.SetString(0, _FileReader.ReadLine());
         yield return _DataRecord;
      }
   }
   finally
   {
      _FileReader.Close();
   }
}

上面的GetFileContents方法被用作下面所示的存储过程的输入参数值:
public static void test()
{
   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   SqlParameter _TVParam = new SqlParameter();
   _TVParam.ParameterName = "@ImportTable";
   _TVParam.TypeName = "dbo.ImportStructure";
   _TVParam.SqlDbType = SqlDbType.Structured;
   _TVParam.Value = GetFileContents(); // return value of the method is streamed data
   _Command.Parameters.Add(_TVParam);

   try
   {
      _Connection.Open();

      _Command.ExecuteNonQuery();
   }
   finally
   {
      _Connection.Close();
   }

   return;
}

附加说明:

  1. 通过一些修改,上述的 C# 代码可用于批量处理数据。
  2. 通过一些小修改,上述的 C# 代码可以适应发送多个字段(上面“流式数据…”文章中展示的示例传入了2个字段)。
  3. 您还可以在过程的 SELECT 语句中操作每个记录的值。
  4. 您还可以通过在过程中使用 WHERE 条件来过滤行。
  5. 您可以多次访问 TVP 表变量;它是只读的但不是“前向只读”的。
  6. TVP 相对于 SqlBulkCopy 的优势:
    1. SqlBulkCopy 仅支持插入,而使用 TVP 可以以任何方式使用数据:您可以调用 MERGE; 您可以根据某些条件 DELETE;您可以将数据拆分成多个表等等。
    2. 由于 TVP 不仅支持插入,因此无需单独的临时表来转储数据。
    3. 您可以通过调用 ExecuteReader 而不是 ExecuteNonQuery 从数据库中获取数据。例如,如果导入表 DATAs 上有一个 IDENTITY 字段,则可以在 INSERT 语句中添加 OUTPUT 子句以传回 INSERTED.[ID](假设 IDIDENTITY 字段的名称)。或者您可以传回完全不同查询的结果,或两者都可以,因为多个结果集可以通过 Reader.NextResult() 发送和访问。使用 SqlBulkCopy 时无法从数据库中获取信息,但是这里有几个关于人们想要做到这一点(至少是关于新创建的 IDENTITY 值)的 S.O. 上的问题。
    4. 有关为什么在整个过程中它有时更快,即使将数据从磁盘读取到 SQL Server 的速度稍慢,请参见 SQL Server 客户咨询团队的此白皮书:Maximizing Throughput with TVP

@CesarBoucas:您的建议绝对有趣,比预加载DataTable更有效。它确实利用了现有的SqlBulkCopy,这很不错,但也可能会受到限制。虽然您的方法确实允许在IDataReader类中进行转换/验证,但它并不允许在数据库中进行任何控制,也不允许将数据返回给调用过程(我刚在底部添加了一条注释)。 - Solomon Rutzky
@srutzky 让我们试着专注于问题范围。这只是关于从文件加载数据到表格,它并没有提到数据转换/验证/返回等等。保持简单 ;) - playful
@CesarBoucas:我的建议和示例代码都是针对所述问题的。我只是额外指出了许多其他优点和选项。我并不是说你的解决方案不合适,只是说它最多只能更加简洁,而且为了获得这种微小的提升,人们需要放弃很多东西。尽管如此,还是有一些情况(例如无法添加DB对象和/或需要一个可替换导入表的进程)适合你的建议。 - Solomon Rutzky
感谢@SolomonRutzky,我也多次使用了您的方法,并且完全喜欢TVP的东西。这对我非常有效。 - cat916
嗨@geeko。我不确定Dapper,因为我不使用它。但是,您肯定应该能够在常规参数化SQL中使用TVPs。在这种情况下,需要使用“@”前缀指定参数名称,并填写“SqlParameter.TypeName”属性(在调用存储过程时不需要这两个)。 - Solomon Rutzky
显示剩余7条评论

7

1
这绝对是一个好的解决方案。但是“最佳”取决于某人想要实现什么。使用SqlBulkCopy限制了可以做的事情,因为它只允许将数据插入到表中。另一方面,TVP允许在插入数据之前、之后、合并而不是插入、执行其他操作(例如首先截断目标表)等任何操作。此外,正如我刚刚添加到“附加说明”部分底部的内容,TVP允许将结果传递回应用程序,而通过SqlBulkCopy是不可能的。 - Solomon Rutzky
3
假设提出的问题是“如何在最短的时间内插入1000万条记录[C#]?”我提出的解决方案是最符合问题的解决方案,换句话说,它是为所提出的问题提供的最佳解决方案。正如您所说:SqlBulkCopy只允许将数据插入到表中 - 这恰好是问题的范围。 - playful
我理解你的意思,但是“最好”的定义仍然由OP确定。由于OP不知道我所描述的内容是可能的,因此问题范围可能被错误地陈述。我遇到过很多次这种情况,有人把问题表述得太狭窄了,但当时并不知道。以最快的方式加载数据实际上可能会导致更慢的整体解决方案,这取决于数据到位后要执行的操作。此外,我建议的方法最多只比你建议的方法略微复杂,但具有很大的灵活性。 - Solomon Rutzky

3
最好的方法是将您的第一种解决方案和第二种混合起来,创建DataTable并在循环中向其中添加行,然后使用BulkCopy上传到数据库中的一个连接使用此链接以获取有关批量复制的帮助
还要注意的一件事是,批量复制是非常敏感的操作,几乎每个错误都会导致复制失败,例如如果您在dataTable中声明列名为"text"而在数据库中它是"Text",它将抛出异常,祝你好运。

2
仅供参考,这个解决方案对于数据集要小得多。随着数据大小的增长,问题在于高昂的内存成本。在10百万行的情况下,假设每行10个字符,在内存中占用100 MB的数据,对吗?好吧,这取决于数据在内存中的存储方式。这种情况似乎只涉及字符串,每个字符为2个字节,因为.Net字符串是UCS-2。因此,在将第一行发送到数据库之前,“DataTable”中的1000万行文件占用200 MB的内存。 - Solomon Rutzky

-6
如果您想为测试目的在最短时间内使用SQL查询将1000万条记录直接插入,请使用以下查询语句。
 CREATE TABLE TestData(ID INT IDENTITY (1,1), CreatedDate DATETIME)
 GO

 INSERT INTO TestData(CreatedDate) SELECT GetDate()
 GO 10000000

你的回答并没有得到很好的接受,这是正确的。但是,就我个人而言,我很高兴从中了解到GO语句需要一个计数参数。 - Tian van Heerden
@TianvanHeerden 虽然有点过时,但是作为参考,GO 并不是 TSQL 的一部分,它是客户端的东西。我认为这个操作会非常耗费资源,可以看看这里的介绍:https://learn.microsoft.com/en-us/sql/t-sql/language-elements/sql-server-utilities-statements-go?view=sql-server-ver16 - Keith Nicholas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接