这里有一个模式,您可以使用它将大查询分批传输,其中每个批是经过压缩的二进制序列化DataTable。传输和反序列化后,每个DataTable都可以直接被SqlBulkCopy使用。相同的模式可以与其他格式一起使用,但在传递给SqlBulkCopy之前需要使用额外的转换器。
using System.Data.SqlClient;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
namespace BatchingDataReader
{
class BatchingDataReader : IDataReader
{
private int batchSize;
private IDataReader rdr;
private int rowsRead;
private bool atEnd;
private int batchesRead;
public BatchingDataReader(IDataReader rdr, int batchSize)
{
this.batchSize = batchSize;
this.rdr = rdr;
}
public object this[int i] => rdr[i];
public object this[string name] => rdr[name];
public int Depth => rdr.Depth;
public bool IsClosed => rdr.IsClosed;
public int RecordsAffected => rdr.RecordsAffected;
public int FieldCount => rdr.FieldCount;
public void Close()
{
if (!atEnd)
return;
rdr.Close();
}
public void Dispose()
{
if (!atEnd)
return;
rdr.Dispose();
}
public bool GetBoolean(int i)
{
return rdr.GetBoolean(i);
}
public byte GetByte(int i)
{
return rdr.GetByte(i);
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
return rdr.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
}
public char GetChar(int i)
{
return rdr.GetChar(i);
}
public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
return rdr.GetChars(i, fieldoffset, buffer, bufferoffset, length);
}
public IDataReader GetData(int i)
{
return rdr.GetData(i);
}
public string GetDataTypeName(int i)
{
return rdr.GetDataTypeName(i);
}
public DateTime GetDateTime(int i)
{
return rdr.GetDateTime(i);
}
public decimal GetDecimal(int i)
{
return rdr.GetDecimal(i);
}
public double GetDouble(int i)
{
return rdr.GetDouble(i);
}
public Type GetFieldType(int i)
{
return rdr.GetFieldType(i);
}
public float GetFloat(int i)
{
return rdr.GetFloat(i);
}
public Guid GetGuid(int i)
{
return rdr.GetGuid(i);
}
public short GetInt16(int i)
{
return rdr.GetInt16(i);
}
public int GetInt32(int i)
{
return rdr.GetInt32(i);
}
public long GetInt64(int i)
{
return rdr.GetInt64(i);
}
public string GetName(int i)
{
return rdr.GetName(i);
}
public int GetOrdinal(string name)
{
return rdr.GetOrdinal(name);
}
public DataTable GetSchemaTable()
{
return rdr.GetSchemaTable();
}
public string GetString(int i)
{
return rdr.GetString(i);
}
public object GetValue(int i)
{
return rdr.GetValue(i);
}
public int GetValues(object[] values)
{
return rdr.GetValues(values);
}
public bool IsDBNull(int i)
{
return rdr.IsDBNull(i);
}
public bool NextResult()
{
if (!atEnd)
{
batchesRead += 1;
rowsRead = 0;
return true;
}
if (IsClosed)
return false;
return rdr.NextResult();
}
public bool Read()
{
if (rowsRead >= batchSize)
return false;
rowsRead += 1;
atEnd = !rdr.Read();
return !atEnd;
}
public static IEnumerable<DataTable> Read(SqlDataReader r, int batchSize)
{
var rdr = new BatchingDataReader(r, batchSize);
do
{
var dt = new DataTable();
dt.TableName = "table";
dt.Load(rdr);
yield return dt;
} while (rdr.NextResult());
}
}
class Program
{
static void Main(string[] args)
{
var constr = "server=localhost;database=master;integrated security=true";
var outfile = "c:\\temp\\out.bin";
if (File.Exists(outfile))
File.Delete(outfile);
using (var con = new SqlConnection(constr))
{
con.Open();
var bin = new BinaryFormatter();
var cmd = new SqlCommand("select top (1000000) * from sys.messages m, sys.objects o", con);
using (SqlDataReader rdr = cmd.ExecuteReader())
using (var destFile = File.OpenWrite(outfile))
using (var zipStream = new System.IO.Compression.GZipStream(destFile,System.IO.Compression.CompressionLevel.Optimal))
{
foreach (var dt in BatchingDataReader.Read(rdr, 10000))
{
Console.WriteLine(dt.Rows.Count);
dt.RemotingFormat = SerializationFormat.Binary;
bin.Serialize(zipStream, dt);
}
}
}
}
}
}
IAsyncEnumerable<RowBatch>
并返回IDataReader
(仅实现异步部分)或IAsyncEnumerable<IDataRecord>
(每个RowBatch
多个记录)? - Marc GravellIAsyncEnumerable<T>
中记录的粒度”,诀窍是:不要将T
定义为一条记录;而是将其定义为N个记录的页面,其中N是某个数字(可以是10, 100,甚至1000)-这样您就可以获得增量批处理的优势,而不需要支付任何重要的“每行”开销。 - Marc Gravell