我做了一个简短的测试项目,并尝试了几种不同的方法。我的目标是使用顺序代码尽快构建一个包含27列(id,A,B,C,...,Z)和约300,000行的DataTable。
(每行都填充有一个ID和其余列填充有随机的5个字母单词。)
在第四次尝试中,我偶然发现了一种基于类型为Object的值数组添加行到表格的不同语法。
(请参见
此处。)
在您的情况下,它应该是这样的:
cityTable.Rows.Add( new Object[] {
((City)e.DatabaseEntry).Id ,
ObjectThatGoesInColumn2 ,
ObjectThatGoesInColumn3 ,
ObjectThatGoesInLastColumn
}
代替:
DataRow row = cityTable.NewRow();
row[0] = 100;
row["City Name"] = Anaheim;
row["Column 7"] = ...
...
row["Column 26"] = checksum;
workTable.Rows.Add( row );
这将使您加速,因为您不必逐个设置每列,根据您的分析器图片,您至少要单独设置12列。
这也避免了对列名字符串进行哈希以查看您正在处理哪个数组位置,然后再次检查数据类型。
如果您有兴趣,这是我的测试项目:
class Program
{
public static System.Data.DataSet dataSet;
public static System.Data.DataSet dataSet2;
public static System.Data.DataSet dataSet3;
public static System.Data.DataSet dataSet4;
public static Random rand = new Random();
public static int NumOfRows = 300000;
static void Main(string[] args)
{
#region test1
Console.WriteLine("Starting");
Console.WriteLine("");
Stopwatch watch = new Stopwatch();
watch.Start();
MakeTable();
watch.Stop();
Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
dataSet = null;
Console.WriteLine("");
Console.WriteLine("Completed.");
Console.WriteLine("");
#endregion
#region test4
Console.WriteLine("Starting Test 4");
Console.WriteLine("");
watch.Reset();
watch.Start();
MakeTable4();
watch.Stop();
Console.WriteLine("Elapsed Time was: " + watch.ElapsedMilliseconds + " milliseconds.");
dataSet4 = null;
Console.WriteLine("");
Console.WriteLine("Completed Test 4.");
#endregion
Console.WriteLine("");
Console.WriteLine("Press Enter to Exit...");
Console.ReadLine();
}
private static void MakeTable()
{
DataTable table = new DataTable("Table 1");
DataColumn column;
DataRow row;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
dataSet = new DataSet();
dataSet.Tables.Add(table);
for (int i = 0; i < NumOfRows; i++)
{
row = table.NewRow();
row["id"] = i;
for (int j = 65; j <= 90; j++)
{
row["5-Letter Word " + (char)j] = getRandomWord();
}
table.Rows.Add(row);
}
}
private static void MakeTable2()
{
DataTable table = new DataTable("Table 2");
DataColumn column;
DataRow row;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
dataSet2 = new DataSet();
dataSet2.Tables.Add(table);
for (int i = 0; i < NumOfRows; i++)
{
row = table.NewRow();
row.BeginEdit();
row["id"] = i;
for (int j = 65; j <= 90; j++)
{
row["5-Letter Word " + (char)j] = getRandomWord();
}
row.EndEdit();
table.Rows.Add(row);
}
}
private static void MakeTable3()
{
DataTable table = new DataTable("Table 3");
DataColumn column;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
dataSet3 = new DataSet();
dataSet3.Tables.Add(table);
DataRow[] newRows = new DataRow[NumOfRows];
for (int i = 0; i < NumOfRows; i++)
{
newRows[i] = table.NewRow();
}
for (int i = 0; i < NumOfRows; i++)
{
newRows[i]["id"] = i;
for (int j = 65; j <= 90; j++)
{
newRows[i]["5-Letter Word " + (char)j] = getRandomWord();
}
table.Rows.Add(newRows[i]);
}
}
private static void MakeTable4()
{
DataTable table = new DataTable("Table 2");
DataColumn column;
column = new DataColumn();
column.DataType = System.Type.GetType("System.Int32");
column.ColumnName = "id";
column.ReadOnly = true;
column.Unique = true;
table.Columns.Add(column);
for (int i = 65; i <= 90; i++)
{
column = new DataColumn();
column.DataType = System.Type.GetType("System.String");
column.ColumnName = "5-Letter Word " + (char)i;
column.AutoIncrement = false;
column.Caption = "Random Word " + (char)i;
column.ReadOnly = false;
column.Unique = false;
table.Columns.Add(column);
}
DataColumn[] PrimaryKeyColumns = new DataColumn[1];
PrimaryKeyColumns[0] = table.Columns["id"];
table.PrimaryKey = PrimaryKeyColumns;
dataSet4 = new DataSet();
dataSet4.Tables.Add(table);
for (int i = 0; i < NumOfRows; i++)
{
table.Rows.Add(
new Object[] {
i,
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord(),
getRandomWord()
}
);
}
}
private static string getRandomWord()
{
char c0 = (char)rand.Next(65, 90);
char c1 = (char)rand.Next(65, 90);
char c2 = (char)rand.Next(65, 90);
char c3 = (char)rand.Next(65, 90);
char c4 = (char)rand.Next(65, 90);
return "" + c0 + c1 + c2 + c3 + c4;
}
private static void printTable()
{
foreach (DataRow row in dataSet.Tables[0].Rows)
{
Console.WriteLine( row["id"] + "--" + row["5-Letter Word A"] + " - " + row["5-Letter Word Z"] );
}
}
}
我还没有真正查看您的并行性,但有几件事情需要注意。
首先,将“ParsedFiles ++;”更改为“Interlocked.Increment(ref ParsedFiles);”,或者在其周围加锁。
其次,建议使用管道模式而不是复杂的事件驱动并行处理方法,这非常适合此类任务。
使用并发集合中的并发队列(或阻塞集合)来保存各个阶段的数据。
第一阶段将保存要处理的文件列表。
工作任务将从该工作列表中出列一个文件,对其进行解析,然后将其添加到第二阶段。
在第二阶段,工作任务将从第二阶段队列(刚刚完成的datatable块)中取出项目,并在它们准备好上传时将其上传到数据库中。
编辑:
我写了一份
流水线 版本的代码,希望能对你有所帮助:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.IO;
using System.Data;
namespace dataTableTesting2
{
class Program
{
private static const int BufferSize = 20;
private static const int MaxBlockSize = 100;
private static BlockingCollection<string> buffer1 = new BlockingCollection<string>(BufferSize);
private static BlockingCollection<string[]> buffer2 = new BlockingCollection<string[]>(BufferSize);
private static BlockingCollection<Object[][]> buffer3 = new BlockingCollection<Object[][]>(BufferSize);
static void Main(string[] args)
{
TaskFactory f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
Task stage0 = f.StartNew(() => PopulateFilesList(buffer1));
Task stage1 = f.StartNew(() => ReadFiles(buffer1, buffer2));
Task stage2 = f.StartNew(() => ParseStringBlocks(buffer2, buffer3));
Task stage3 = f.StartNew(() => UploadBlocks(buffer3) );
Task.WaitAll(stage0, stage1, stage2, stage3);
}
private static void PopulateFilesList( BlockingCollection<string> output )
{
try
{
buffer1.Add("file1.txt");
buffer1.Add("file2.txt");
buffer1.Add("lastFile.txt");
}
finally
{
output.CompleteAdding();
}
}
private static void ReadFiles( BlockingCollection<string> input, BlockingCollection<string[]> output)
{
try
{
foreach (string file in input.GetConsumingEnumerable())
{
List<string> list = new List<string>(MaxBlockSize);
using (StreamReader sr = new StreamReader(file))
{
int countLines = 0;
while (!sr.EndOfStream)
{
list.Add( sr.ReadLine() );
countLines++;
if (countLines > MaxBlockSize)
{
output.Add(list.ToArray());
countLines = 0;
list = new List<string>(MaxBlockSize);
}
}
if (list.Count > 0)
{
output.Add(list.ToArray());
}
}
}
}
finally
{
output.CompleteAdding();
}
}
private static void ParseStringBlocks( BlockingCollection<string[]> input, BlockingCollection< Object[][] > output)
{
try
{
List<Object[]> result = new List<object[]>(MaxBlockSize);
foreach (string[] block in input.GetConsumingEnumerable())
{
foreach (string line in block)
{
string[] splitLine = line.Split('\t');
string cityName = splitLine[0];
int cityPop = Int32.Parse( splitLine[1] );
int cityElevation = Int32.Parse(splitLine[2]);
result.Add(new Object[] { cityName, cityPop, cityElevation });
}
output.Add( result.ToArray() );
}
}
finally
{
output.CompleteAdding();
}
}
private static void UploadBlocks(BlockingCollection<Object[][]> input)
{
DataTable dataTable = new DataTable();
foreach (Object[][] block in input.GetConsumingEnumerable())
{
foreach (Object[] rowValues in block)
{
dataTable.Rows.Add(rowValues);
}
dataTable.Rows.Clear();
}
}
}
}
它将工作分为4个部分,可以由不同的任务完成:
制作要处理的文件列表
从该列表中获取文件并将其读入字符串数组中
从上一部分获取字符串数组并解析它们,创建包含每行表值的对象数组
将行上传到数据库
此外,您还可以轻松地将多个任务分配给每个阶段,如果需要,允许多个工作者执行相同的管道阶段。
(我怀疑除非您使用固态驱动器,否则具有多个任务从文件中读取数据将没有用处,因为在内存中跳转相当缓慢。)
此外,您可以通过程序执行设置内存中的数据量限制。
每个缓冲区都是使用最大大小初始化的 BlockingCollection,这意味着如果缓冲区已满,并且另一个任务尝试添加另一个元素,则会阻止该任务。
幸运的是,任务并行库很聪明,如果任务被阻止,它将安排另一个未被阻止的任务,并稍后检查第一个任务是否停止被阻止。
目前每个缓冲区只能容纳20个项目,每个项目仅有100个字符,这意味着:
因此,这将需要足够的内存来保存20个文件名、来自文件的2000行和2000个城市信息。(还要加上一些局部变量等的额外内存)。
您可能需要增加BufferSize和MaxBlockSize以提高效率,尽管现在的设置应该可以工作。
请注意,我没有测试过这个程序,因为我没有任何输入文件,所以可能存在一些错误。
RaiseEntryParsed
只是为了报告进度,为什么不考虑每100或1,000个记录才调用一次?如果您要解析超过1GB的数据,我非常怀疑有人会关心正在解析哪一行数据。 - ChandlerPelhams