在C#中解析并上传大于1GB的数据

3

我编写了一个程序来解析和上传大量数据到数据库中。问题是解析速度太慢了。我的程序工作方式是,我有一个解析器类,它使用并行处理每个文件,并对每个文件中解析的每个条目引发事件:

Parallel.ForEach<FileInfo>(
    files,
    new ParallelOptions { MaxDegreeOfParallelism = maxParallelism },
    (inputFile, args) =>
    {
        // Using underlying FileStream to allow concurrent Read/Write access.
        using (var input = new StreamReader(inputFile.FullName))
        {
            while (!input.EndOfStream)
            {
                RaiseEntryParsed(ParseCity(input.ReadLine()));
            }
            ParsedFiles++;
            RaiseFileParsed(inputFile);
        }
    });
RaiseDirectoryParsed(Directory);

"main"程序订阅了此事件,并将条目添加到DataTable中进行SqlBulkCopy;当解析器类引发FileParsed事件(每次解析文件时)时,SqlBulkCopy才会提交:

using (SqlBulkCopy bulkCopy = new SqlBulkCopy(_connectionString))
{
    DataTable cityTable = DataContext.CreateCityDataTable();
    parser.EntryParsed +=
        (s, e) =>
        {
            DataRow cityRow = cityTable.NewRow();
            City parsedCity = (City)e.DatabaseEntry;

            cityRow["id"] = parsedCity.Id;
            ...
            ...

            cityTable.Rows.Add(cityRow);
        };

    parser.FileParsed +=
        (s, e) =>
        {
            bulkCopy.WriteToServer(cityTable);
            Dispatcher.BeginInvoke((Action)UpdateProgress);
            cityTable.Rows.Clear();
        };

    parser.DirectoryParsed +=
        (s, e) =>
        {
            bulkCopy.WriteToServer(cityTable);
            Dispatcher.BeginInvoke((Action)UpdateProgress);
        };

    parser.BeginParsing();
}

表格在每次提交后被清空的原因是为了节省内存并防止OutOfMemoryException由于存在太多实体而发生...
我该如何使其更快,目前速度不可接受。我对应用程序进行了分析,它指出大部分时间都花费在Entryparsed事件上。谢谢。

5
你是否先尝试过不使用并行化?对于那些受到IO限制的操作,并行化可能实际上没有帮助(甚至可能会使情况变得更糟)。但我们离题了。你说“解析”速度太慢,但你实际上没有展示任何解析的内容。此外,我认为这段代码存在极大的风险:你可能有两个线程同时与同一个SqlBulkCopy进行通信-这是无效的。这里的底层数据是什么?我认为你应该完全忽略DataTable,并在非缓冲输入和SqlBulkCopy之间直接连接。所以,这里的输入是什么? - Marc Gravell
除了考虑@MarcGravell刚才说的,还可以尝试对应用程序进行分析(如果没有现成的应用程序,可以暂停调试器10次以查看它最常停在哪里)。将事件分析到更详细的层面。 - usr
Marc, 我尝试了没有并行化的方法,请看这篇文章: http://stackoverflow.com/questions/9260404/i-need-to-parse-250-files-totalling-at-1gb-of-data-and-upload-it-to-a-sql-serve 并行解析提供了很大的性能提升。 目前程序实际上不是IO绑定,而是我的CPU正在以100%的利用率运行。磁盘驱动器和网络几乎没有被利用(在任务管理器中)。 我认为问题不在于解析本身,它很简单(只需进行类型转换)。我可以通过锁定来解决风险线程问题,但这会使程序变慢。 - Francisco Aguilera
底层数据是Geonames城市信息数据库。它由250个大文件组成,每行都是制表符分隔的城市数据。您能解释一下这句话吗?“完全忽略DataTable,直接在非缓冲输入和SqlBulkCopy之间进行连接。” - Francisco Aguilera
以下是有关编程的内容的翻译(仅返回翻译后的文本): 这是一个性能分析的截图,它奇怪地显示99.9%的时间被花费在将新解析的行添加到DataTable中... http://img803.imageshack.us/img803/6503/profilen.jpg - Francisco Aguilera
如果您调用RaiseEntryParsed只是为了报告进度,为什么不考虑每100或1,000个记录才调用一次?如果您要解析超过1GB的数据,我非常怀疑有人会关心正在解析哪一行数据。 - ChandlerPelhams
3个回答

2
我做了一个简短的测试项目,并尝试了几种不同的方法。我的目标是使用顺序代码尽快构建一个包含27列(id,A,B,C,...,Z)和约300,000行的DataTable。
(每行都填充有一个ID和其余列填充有随机的5个字母单词。)
在第四次尝试中,我偶然发现了一种基于类型为Object的值数组添加行到表格的不同语法。
(请参见此处。)
在您的情况下,它应该是这样的:
cityTable.Rows.Add( new Object[] {

  ((City)e.DatabaseEntry).Id ,

  ObjectThatGoesInColumn2    ,

  ObjectThatGoesInColumn3    ,

  ObjectThatGoesInLastColumn

}

代替:
DataRow row = cityTable.NewRow();

row[0] = 100;
row["City Name"] = Anaheim;
row["Column 7"] = ...
...
row["Column 26"] = checksum;

workTable.Rows.Add( row );

这将使您加速,因为您不必逐个设置每列,根据您的分析器图片,您至少要单独设置12列。
这也避免了对列名字符串进行哈希以查看您正在处理哪个数组位置,然后再次检查数据类型。
如果您有兴趣,这是我的测试项目:
class Program
{
    public static System.Data.DataSet dataSet;
    public static System.Data.DataSet dataSet2;
    public static System.Data.DataSet dataSet3;
    public static System.Data.DataSet dataSet4;

    public static Random rand = new Random();

    public static int NumOfRows = 300000;

    static void Main(string[] args)
    {

        #region test1

        Console.WriteLine("Starting");

        Console.WriteLine("");

        Stopwatch watch = new Stopwatch();

        watch.Start();

        MakeTable();

        watch.Stop();

        Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
        dataSet = null;

        Console.WriteLine("");

        Console.WriteLine("Completed.");

        Console.WriteLine("");

        #endregion

        /*

        #region test2


        Console.WriteLine("Starting Test 2");

        Console.WriteLine("");

        watch.Reset();

        watch.Start();

        MakeTable2();

        watch.Stop();

        Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
        dataSet2 = null;

        Console.WriteLine("");

        Console.WriteLine("Completed Test 2.");

        #endregion


        #region test3
        Console.WriteLine("");

        Console.WriteLine("Starting Test 3");

        Console.WriteLine("");

        watch.Reset();

        watch.Start();

        MakeTable3();

        watch.Stop();

        Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
        dataSet3 = null;

        Console.WriteLine("");

        Console.WriteLine("Completed Test 3.");

        #endregion

         */ 

        #region test4
        Console.WriteLine("Starting Test 4");

        Console.WriteLine("");

        watch.Reset();

        watch.Start();

        MakeTable4();

        watch.Stop();

        Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
        dataSet4 = null;

        Console.WriteLine("");

        Console.WriteLine("Completed Test 4.");

        #endregion


        //printTable();

        Console.WriteLine("");
        Console.WriteLine("Press Enter to Exit...");

        Console.ReadLine();
    }

    private static void MakeTable()
    {
        DataTable table = new DataTable("Table 1");

        DataColumn column;
        DataRow row;

        column = new DataColumn();
        column.DataType = System.Type.GetType("System.Int32");
        column.ColumnName = "id";
        column.ReadOnly = true;
        column.Unique = true;

        table.Columns.Add(column);


        for (int i = 65; i <= 90; i++)
        {
            column = new DataColumn();
            column.DataType = System.Type.GetType("System.String");
            column.ColumnName = "5-Letter Word " + (char)i;
            column.AutoIncrement = false;
            column.Caption = "Random Word " + (char)i;
            column.ReadOnly = false;
            column.Unique = false;
            // Add the column to the table.
            table.Columns.Add(column);
        }

        DataColumn[] PrimaryKeyColumns = new DataColumn[1];
        PrimaryKeyColumns[0] = table.Columns["id"];
        table.PrimaryKey = PrimaryKeyColumns;

        // Instantiate the DataSet variable.
        dataSet = new DataSet();
        // Add the new DataTable to the DataSet.
        dataSet.Tables.Add(table);

        // Create three new DataRow objects and add 
        // them to the DataTable
        for (int i = 0; i < NumOfRows; i++)
        {
            row = table.NewRow();
            row["id"] = i;

            for (int j = 65; j <= 90; j++)
            {
                row["5-Letter Word " + (char)j] = getRandomWord();
            }

            table.Rows.Add(row);
        }

    }

    private static void MakeTable2()
    {
        DataTable table = new DataTable("Table 2");

        DataColumn column;
        DataRow row;

        column = new DataColumn();
        column.DataType = System.Type.GetType("System.Int32");
        column.ColumnName = "id";
        column.ReadOnly = true;
        column.Unique = true;

        table.Columns.Add(column);


        for (int i = 65; i <= 90; i++)
        {
            column = new DataColumn();
            column.DataType = System.Type.GetType("System.String");
            column.ColumnName = "5-Letter Word " + (char)i;
            column.AutoIncrement = false;
            column.Caption = "Random Word " + (char)i;
            column.ReadOnly = false;
            column.Unique = false;
            // Add the column to the table.
            table.Columns.Add(column);
        }

        DataColumn[] PrimaryKeyColumns = new DataColumn[1];
        PrimaryKeyColumns[0] = table.Columns["id"];
        table.PrimaryKey = PrimaryKeyColumns;

        // Instantiate the DataSet variable.
        dataSet2 = new DataSet();
        // Add the new DataTable to the DataSet.
        dataSet2.Tables.Add(table);

        // Create three new DataRow objects and add 
        // them to the DataTable
        for (int i = 0; i < NumOfRows; i++)
        {
            row = table.NewRow();

            row.BeginEdit();

            row["id"] = i;

            for (int j = 65; j <= 90; j++)
            {
                row["5-Letter Word " + (char)j] = getRandomWord();
            }

            row.EndEdit();

            table.Rows.Add(row);
        }

    }

    private static void MakeTable3()
    {
        DataTable table = new DataTable("Table 3");

        DataColumn column;

        column = new DataColumn();
        column.DataType = System.Type.GetType("System.Int32");
        column.ColumnName = "id";
        column.ReadOnly = true;
        column.Unique = true;

        table.Columns.Add(column);


        for (int i = 65; i <= 90; i++)
        {
            column = new DataColumn();
            column.DataType = System.Type.GetType("System.String");
            column.ColumnName = "5-Letter Word " + (char)i;
            column.AutoIncrement = false;
            column.Caption = "Random Word " + (char)i;
            column.ReadOnly = false;
            column.Unique = false;
            // Add the column to the table.
            table.Columns.Add(column);
        }

        DataColumn[] PrimaryKeyColumns = new DataColumn[1];
        PrimaryKeyColumns[0] = table.Columns["id"];
        table.PrimaryKey = PrimaryKeyColumns;

        // Instantiate the DataSet variable.
        dataSet3 = new DataSet();
        // Add the new DataTable to the DataSet.
        dataSet3.Tables.Add(table);


        DataRow[] newRows = new DataRow[NumOfRows];

        for (int i = 0; i < NumOfRows; i++)
        {
            newRows[i] = table.NewRow();
        }

        // Create three new DataRow objects and add 
        // them to the DataTable
        for (int i = 0; i < NumOfRows; i++)
        {

            newRows[i]["id"] = i;

            for (int j = 65; j <= 90; j++)
            {
                newRows[i]["5-Letter Word " + (char)j] = getRandomWord();
            }

            table.Rows.Add(newRows[i]);
        }

    }

    private static void MakeTable4()
    {
        DataTable table = new DataTable("Table 2");

        DataColumn column;

        column = new DataColumn();
        column.DataType = System.Type.GetType("System.Int32");
        column.ColumnName = "id";
        column.ReadOnly = true;
        column.Unique = true;

        table.Columns.Add(column);


        for (int i = 65; i <= 90; i++)
        {
            column = new DataColumn();
            column.DataType = System.Type.GetType("System.String");
            column.ColumnName = "5-Letter Word " + (char)i;
            column.AutoIncrement = false;
            column.Caption = "Random Word " + (char)i;
            column.ReadOnly = false;
            column.Unique = false;
            // Add the column to the table.
            table.Columns.Add(column);
        }

        DataColumn[] PrimaryKeyColumns = new DataColumn[1];
        PrimaryKeyColumns[0] = table.Columns["id"];
        table.PrimaryKey = PrimaryKeyColumns;

        // Instantiate the DataSet variable.
        dataSet4 = new DataSet();
        // Add the new DataTable to the DataSet.
        dataSet4.Tables.Add(table);

        // Create three new DataRow objects and add 
        // them to the DataTable
        for (int i = 0; i < NumOfRows; i++)
        {

            table.Rows.Add( 

                new Object[] {

                    i,

                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord(),
                    getRandomWord()

                } 

            );
        }

    }



    private static string getRandomWord()
    {

        char c0 = (char)rand.Next(65, 90);
        char c1 = (char)rand.Next(65, 90);
        char c2 = (char)rand.Next(65, 90);
        char c3 = (char)rand.Next(65, 90);
        char c4 = (char)rand.Next(65, 90);

        return "" + c0 + c1 + c2 + c3 + c4;
    }

    private static void printTable()
    {
        foreach (DataRow row in dataSet.Tables[0].Rows)
        {
            Console.WriteLine( row["id"] + "--" + row["5-Letter Word A"] + " - " + row["5-Letter Word Z"] );
        }
    }


}

我还没有真正查看您的并行性,但有几件事情需要注意。
首先,将“ParsedFiles ++;”更改为“Interlocked.Increment(ref ParsedFiles);”,或者在其周围加锁。
其次,建议使用管道模式而不是复杂的事件驱动并行处理方法,这非常适合此类任务。
使用并发集合中的并发队列(或阻塞集合)来保存各个阶段的数据。
第一阶段将保存要处理的文件列表。
工作任务将从该工作列表中出列一个文件,对其进行解析,然后将其添加到第二阶段。
在第二阶段,工作任务将从第二阶段队列(刚刚完成的datatable块)中取出项目,并在它们准备好上传时将其上传到数据库中。
编辑:
我写了一份 流水线 版本的代码,希望能对你有所帮助:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.IO;
using System.Data;

namespace dataTableTesting2
{
    class Program
    {          
        private static const int BufferSize = 20; //Each buffer can only contain this many elements at a time
                                                  //This limits the total amount of memory 

        private static const int MaxBlockSize = 100;

        private static BlockingCollection<string> buffer1 = new BlockingCollection<string>(BufferSize);

        private static BlockingCollection<string[]> buffer2 = new BlockingCollection<string[]>(BufferSize);

        private static BlockingCollection<Object[][]> buffer3 = new BlockingCollection<Object[][]>(BufferSize);

        /// <summary>
        /// Start Pipelines and wait for them to finish.
        /// </summary>
        static void Main(string[] args)
        {
            TaskFactory f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

            Task stage0 = f.StartNew(() => PopulateFilesList(buffer1));
            Task stage1 = f.StartNew(() => ReadFiles(buffer1, buffer2));
            Task stage2 = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
            Task stage3 = f.StartNew(() => UploadBlocks(buffer3) );

            Task.WaitAll(stage0, stage1, stage2, stage3);

            /*
            // Note for more workers on particular stages you can make more tasks for each stage, like the following
            //    which populates the file list in 1 task, reads the files into string[] blocks in 1 task,
            //    then parses the string[] blocks in 4 concurrent tasks
            //    and lastly uploads the info in 2 tasks

            TaskFactory f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

            Task stage0 = f.StartNew(() => PopulateFilesList(buffer1));
            Task stage1 = f.StartNew(() => ReadFiles(buffer1, buffer2));

            Task stage2a = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
            Task stage2b = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
            Task stage2c = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
            Task stage2d = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));

            Task stage3a = f.StartNew(() => UploadBlocks(buffer3) );
            Task stage3b = f.StartNew(() => UploadBlocks(buffer3) );

            Task.WaitAll(stage0, stage1, stage2a, stage2b, stage2c, stage2d, stage3a, stage3b);

            */
        }

        /// <summary>
        /// Adds the filenames to process into the first pipeline
        /// </summary>
        /// <param name="output"></param>
        private static void PopulateFilesList( BlockingCollection<string> output )
        {
            try
            {
                buffer1.Add("file1.txt");
                buffer1.Add("file2.txt");
                //...
                buffer1.Add("lastFile.txt");
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        /// <summary>
        /// Takes filnames out of the first pipeline, reads them into string[] blocks, and puts them in the second pipeline
        /// </summary>
        private static void ReadFiles( BlockingCollection<string> input, BlockingCollection<string[]> output)
        {
            try
            {
                foreach (string file in input.GetConsumingEnumerable())
                {
                    List<string> list = new List<string>(MaxBlockSize);

                    using (StreamReader sr = new StreamReader(file))
                    {
                        int countLines = 0;

                        while (!sr.EndOfStream)
                        {
                            list.Add( sr.ReadLine() );
                            countLines++;

                            if (countLines > MaxBlockSize)
                            {
                                output.Add(list.ToArray());
                                countLines = 0;
                                list = new List<string>(MaxBlockSize);
                            }
                        }

                        if (list.Count > 0)
                        {
                            output.Add(list.ToArray());
                        }
                    }
                }
            }

            finally
            {
                output.CompleteAdding();
            }
        }

        /// <summary>
        /// Takes string[] blocks from the second pipeline, for each line, splits them by tabs, and parses
        /// the data, storing each line as an object array into the third pipline.
        /// </summary>
        private static void ParseStringBlocks( BlockingCollection<string[]> input, BlockingCollection< Object[][] > output)
        {
            try
            {
                List<Object[]> result = new List<object[]>(MaxBlockSize);

                foreach (string[] block in input.GetConsumingEnumerable())
                {
                    foreach (string line in block)
                    {
                        string[] splitLine = line.Split('\t'); //split line on tab

                        string cityName = splitLine[0];
                        int cityPop = Int32.Parse( splitLine[1] );
                        int cityElevation = Int32.Parse(splitLine[2]);
                        //...

                        result.Add(new Object[] { cityName, cityPop, cityElevation });
                    }

                    output.Add( result.ToArray() );
                }
            }

            finally
            {
                output.CompleteAdding();
            }
        }

        /// <summary>
        /// Takes the data blocks from the third pipeline, and uploads each row to SQL Database
        /// </summary>
        private static void UploadBlocks(BlockingCollection<Object[][]> input)
        {
            /*
             * At this point 'block' is an array of object arrays.
             * 
             * The block contains MaxBlockSize number of cities.
             * 
             * There is one object array for each city.
             * 
             * The object array for the city is in the pre-defined order from pipeline stage2
             * 
             * You could do a couple of things at this point:
             * 
             * 1. declare and initialize a DataTable with the correct column types
             *    then, do the  dataTable.Rows.Add( rowValues )
             *    then, use a Bulk Copy Operation to upload the dataTable to SQL
             *    http://msdn.microsoft.com/en-us/library/7ek5da1a
             * 
             * 2. Manually perform the sql commands/transactions similar to what 
             *    Kevin recommends in this suggestion:
             *    https://dev59.com/QHNA5IYBdhLWcg3wSrua#1024195
             * 
             * I've demonstrated the first approach with this code.
             * 
             * */


            DataTable dataTable = new DataTable();

            //set up columns of dataTable here.

            foreach (Object[][] block in input.GetConsumingEnumerable())
            {
                foreach (Object[] rowValues in block)
                {

                    dataTable.Rows.Add(rowValues);
                }

                //do bulkCopy to upload table containing MaxBlockSize number of cities right here.

                dataTable.Rows.Clear(); //Remove the rows when you are done uploading, but not the dataTable.
            }
        }

    }
}

它将工作分为4个部分,可以由不同的任务完成:
  1. 制作要处理的文件列表

  2. 从该列表中获取文件并将其读入字符串数组中

  3. 从上一部分获取字符串数组并解析它们,创建包含每行表值的对象数组

  4. 将行上传到数据库

此外,您还可以轻松地将多个任务分配给每个阶段,如果需要,允许多个工作者执行相同的管道阶段。
(我怀疑除非您使用固态驱动器,否则具有多个任务从文件中读取数据将没有用处,因为在内存中跳转相当缓慢。)
此外,您可以通过程序执行设置内存中的数据量限制。
每个缓冲区都是使用最大大小初始化的 BlockingCollection,这意味着如果缓冲区已满,并且另一个任务尝试添加另一个元素,则会阻止该任务。
幸运的是,任务并行库很聪明,如果任务被阻止,它将安排另一个未被阻止的任务,并稍后检查第一个任务是否停止被阻止。
目前每个缓冲区只能容纳20个项目,每个项目仅有100个字符,这意味着:
  • 缓冲区1将随时包含最多20个文件名。

  • 缓冲区2将随时包含最多20个由这些文件组成的字符串块(由100行组成)。

  • 缓冲区3将随时包含最多20个数据块项目(100个城市的对象值)。

因此,这将需要足够的内存来保存20个文件名、来自文件的2000行和2000个城市信息。(还要加上一些局部变量等的额外内存)。
您可能需要增加BufferSize和MaxBlockSize以提高效率,尽管现在的设置应该可以工作。
请注意,我没有测试过这个程序,因为我没有任何输入文件,所以可能存在一些错误。

感谢您的敏锐观察。我采纳了您在Add/hashing问题上的建议,至于第二个问题(使用Interlocked.increment),即使我已将ParsedFiles标记为volatile,这是否必要呢? 您如何提交队列中的城市?您需要将它们全部添加到DataTable或DataRow[]中,而且,所有这些都不可能适合内存。这就是我以事件驱动方式处理此问题的原因,因为这样我只需要在内存中一次性存储有限数量的实体。 - Francisco Aguilera
@FranciscoAguilera 我添加了一个流水线版本,考虑到有限的内存大小。 - Xantix

1
虽然我同意其他评论和答案的一些观点,但你尝试过执行以下操作吗:
cityTable.Rows.BeginEdit() 

在城市表中添加第一项之前。

然后调用a:

cityTable.Rows.EndEdit()

在FileParased事件处理程序中。

我在DataRowCollection类中没有看到BeginEdit或EndEdit。http://msdn.microsoft.com/en-us/library/system.data.datarowcollection.aspx - Francisco Aguilera
@FranciscoAguilera BeginEdit和EndEdit在DataRow类这里中,但除非您订阅“行更改”事件,否则在我的测试中似乎没有太大的区别。 - Xantix

1
如果您正在寻找原始性能,那么像这样的东西不是最好的选择吗?它完全绕过了datatable代码,这似乎是一个不必要的步骤。
void BulkInsertFile(string fileName, string tableName)
    {
        FileInfo info = new FileInfo(fileName);
        string name = info.Name;
        string shareDirectory = ""; //the path of the share: \\servername\shareName\
        string serverDirectory = ""; //the local path of the share on the server: C:\shareName\

        File.Copy(fileName, shareDirectory + name);
        // or you could call your method to parse the file and write it to the share directory.

        using (SqlConnection cnn = new SqlConnection("connectionString"))
        {
            cnn.Open();
            using (SqlCommand cmd = cnn.CreateCommand())
            {
                cmd.CommandText = string.Format("bulk insert {0} from '{1}' with (fieldterminator = ',', rowterminator = '\n')", tableName, serverDirectory + name);

                try
                {
                    cmd.ExecuteScalar();
                }
                catch (SqlException ex)
                {
                    MessageBox.Show(ex.Message);
                }
            }
        }
    }

这里有关于批量插入命令的一些信息。


感谢您的帮助。 不幸的是,Sql Azure(我的数据库提供商)不允许使用“BULK INSERT”命令。奇怪的是,他们允许从CLR库中使用SqlBulkInsert,但事实就是这样。由于此原因,上述代码将引发错误。 - Francisco Aguilera

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接