从SQL中读取数百万条记录,进行处理并将它们插入到另一个SQL服务器。

3

我有一个具有以下结构的表:

CREATE TABLE [dbo].[case_waveform_data] (
    [case_id]                INT             NOT NULL,
    [channel_index]          INT             NOT NULL,
    [seconds_between_points] REAL            NOT NULL,
    [last_time_stamp]        DATETIME        NOT NULL,
    [value_array]            VARBINARY (MAX) NULL
);

这个表将包含数百万条记录。我想从一个数据库中按case_id和channel_index读取数据,然后通过解压value_array数据并将它们链接在一起来组合5分钟的数据,压缩该流,然后将组合块添加到另一个数据库中。

我拥有的代码对大约100k条记录以上的数据运行良好。之后,我会遇到随机错误,例如系统.data中的内存不足,CRC与压缩/未压缩数据不符,压缩数据中存在无效字符等。如果我超过100k,这些错误将随机发生。

我曾经使用linq循环遍历记录,但后来直接使用SqlDataReader。为了导入记录,我使用SqlBulkCopy,但我发现即使将该部分注释掉,我仍会遇到错误。似乎,如果我将每个组合记录作为插入写入文件,代码将完成,但是如果我开始收集组合记录到列表中,然后将其传递给SqlBulkCopy进行插入,我将获得随机错误。大多数情况下,它是reader.Read()(或使用linq时的foreach(var record in records)行)上的内存不足错误。进程本身的内存工作集、私有和提交约为80MB。

我做错了什么?是否有更好的方法来实现这一点?如果我使用写出的文件,它将达到约300MB,我可以加载那么大的文件吗?

这是整个函数。它已经重写了大约20次,所以可能有一些奇怪的代码:

using (LiveORDataContext dc = new LiveORDataContext(LiveORDataManager.ConnectionString))
{
    dc.Log = Console.Out;
    dc.ObjectTrackingEnabled = false;

    Stopwatch sw = Stopwatch.StartNew();

    int recordcount = 0;
    // Increase the timeout to 10 minutes for really big cases
    dc.CommandTimeout = 600;
    //Dictionary<int, int> channelindexes = dc.case_waveform_datas.Where(d => d.case_id == livecaseid).GroupBy(d => d.channel_index).ToDictionary(d => d.Key, d => d.Count());

    // get a distict list of all the channel indexes we need to import for this case
    List<int> channelindexes = (from wd in  dc.case_waveform_datas
                                where wd.case_id == livecaseid
                                group wd by wd.channel_index into grp
                                select grp.Key)
                               .ToList();

    // Loop through each channel's data for the case, combine it and compress it
    foreach (int channel in channelindexes)
    {
        List<case_waveform_data> wavedatalist = new List<case_waveform_data>();
        int warehouserecordcount = 0;
        float secondsbetweenpoints = float.NaN;
        DateTime lastaddedrecordtime = DateTime.MinValue;
        DateTime previoustime = DateTime.MinValue;
        List<float> wfpoints = new List<float>();

        string queryString = String.Format("SELECT case_id, channel_index, last_time_stamp, seconds_between_points, " +
                                           "value_array FROM case_waveform_data " +
                                           "WHERE case_id = {0} and channel_index = {1} " +
                                           "ORDER BY last_time_stamp", 
            livecaseid, channel);

        using (SqlConnection connection = new SqlConnection(LiveORDataManager.ConnectionString))
        {
            SqlCommand command = new SqlCommand(queryString, connection);
            connection.Open();

            SqlDataReader reader = command.ExecuteReader();

            // Call Read before accessing data. 
            while (reader.Read()) // Currently fails here
            {
                var item = new
                {
                   case_id = reader.GetInt32(0),
                   channel_index = reader.GetInt32(1),
                   last_time_stamp = reader.GetDateTime(2),
                   seconds_between_points = reader.GetFloat(3),
                   value_array = (byte[])reader["value_array"]
                };                    

        //var wdlist = from wfd in dc.case_waveform_datas
        //    where wfd.case_id == livecaseid && wfd.channel_index == channel
        //    orderby wfd.last_time_stamp
        //    select new
        //           {
        //               wfd.case_id,
        //               wfd.channel_index,
        //               wfd.last_time_stamp,
        //               wfd.seconds_between_points,
        //               wfd.value_array
        //           };

        // Loop through each channel and create floating point arrays that are larger than 
        // per second groups.    
        //foreach (var item in wdlist)
        //{
            // Get a record count for the info log
            recordcount++;

            if (float.IsNaN(secondsbetweenpoints))
            {
                secondsbetweenpoints = item.seconds_between_points > 0.0f
                    ? item.seconds_between_points
                    : 0.002f;
            } // assume .002 as a default if this is not set

            if (lastaddedrecordtime == DateTime.MinValue)
            {
                lastaddedrecordtime = item.last_time_stamp;
            }
            if (previoustime == DateTime.MinValue)
            {
                previoustime = item.last_time_stamp;
            }

            if ((secondsbetweenpoints != item.seconds_between_points && item.seconds_between_points > 0.0f) ||
                item.last_time_stamp > lastaddedrecordtime.AddMinutes(5))
            {
                // The seconds between points has changed so gzip the array of 
                // floats and insert the record.
                var ms = new MemoryStream();
                using (var gZipStream = new GZipStream(ms, CompressionMode.Compress))
                {
                    new BinaryFormatter().Serialize(gZipStream, wfpoints.ToArray());
                }

                // add the new combined record to a list that will be bulk inserted every 1000 records
                wavedatalist.Add(
                    //dcwarehouse.case_waveform_datas.InsertOnSubmit(
                    new case_waveform_data
                    {
                        case_id = warehousecaseid,
                        channel_index = channel,
                        seconds_between_points = secondsbetweenpoints,
                        last_time_stamp = previoustime,
                        value_array = ms.ToArray()
                    });
                if (writeFile) { writer.WriteLine("(@caseid, {0}, {1}, '{2}', 0x{3}),", channel, secondsbetweenpoints, previoustime, BitConverter.ToString(ms.ToArray()).Replace("-", string.Empty)); }
                ms.Close();
                wfpoints.Clear();
                secondsbetweenpoints = item.seconds_between_points;
                lastaddedrecordtime = item.last_time_stamp;

                // To keep memory down submit the changes to the warehouse database more often
                // than after the whole channel's data has been prepared. This handles cases
                // that have run for multiple days
                warehouserecordcount++;
                if (warehouserecordcount > 300)
                {
                    BulkInsertAll(wavedatalist);
                    wavedatalist.Clear();
                    warehouserecordcount = 0;
                    Console.WriteLine("Recordcount: {0}", recordcount);
                }
            }

            // Decompress the float values and append them
            var ms1 = new MemoryStream(item.value_array);
            using (var gZipStream = new GZipStream(ms1, CompressionMode.Decompress))
            {
                // Decompress the float array
                float[] wd = (float[])new BinaryFormatter().Deserialize(gZipStream);

                // determine the timestamp of the first float given the timestamp of the last float,
                // the number of elements and the seconds between floats
                var listfirsttimestamp =
                    item.last_time_stamp.AddSeconds((wd.Length - 1) * secondsbetweenpoints * -1);

                // if the last time of the previous list + the seconds between is still 
                // less than the new list's first time then add in NaNs
                while (previoustime.AddSeconds(secondsbetweenpoints) < listfirsttimestamp)
                {
                    wfpoints.Add(float.NaN);
                    previoustime = previoustime.AddSeconds(secondsbetweenpoints);
                }

                // now append the list
                wfpoints.AddRange(wd);
            }
            ms1.Close();
            previoustime = item.last_time_stamp;

        //}
            }

            // Call Close when done reading.
            reader.Close();
        }
        // If there are any points left for the channel add them here
        if (wfpoints.Any())
        {
            var ms = new MemoryStream();
            using (var gZipStream = new GZipStream(ms, CompressionMode.Compress))
            {
                new BinaryFormatter().Serialize(gZipStream, wfpoints.ToArray());
            }

            wavedatalist.Add(
                new case_waveform_data
                {
                    case_id = warehousecaseid,
                    channel_index = channel,
                    seconds_between_points = secondsbetweenpoints,
                    last_time_stamp = previoustime,
                    value_array = ms.ToArray()
                });
            if (writeFile) { writer.WriteLine("(@caseid, {0}, {1}, '{2}', 0x{3}),", channel, secondsbetweenpoints, previoustime, BitConverter.ToString(ms.ToArray()).Replace("-", string.Empty)); }
            ms.Close();
        }

        if (wavedatalist.Count > 0)
        {
            BulkInsertAll(wavedatalist);
            wavedatalist.Clear();
        }
        Console.WriteLine("Recordcount: {0}", recordcount);
    }

    sw.Stop();
    logger.Info("Livecase: [{0}], Warehouse Caseid: [{1}], Recordcount: [{2}]. Waveform data import took [{3}ms]",
        livecaseid, warehousecaseid, recordcount, sw.ElapsedMilliseconds);
}

if (writeFile)
{
    writer.Close();
}

编辑: 以下是其中一个错误。它发生在这一行:

 var item = new
               {
                   case_id = reader.GetInt32(0),
                   channel_index = reader.GetInt32(1),
                   last_time_stamp = reader.GetDateTime(2),
                   seconds_between_points = reader.GetFloat(3),
                   value_array = (byte[])reader["value_array"]
               };

以下是堆栈跟踪信息:

System.InvalidOperationException - Internal connection fatal error.
at System.Data.SqlClient.TdsParserStateObject.TryProcessHeader()
at System.Data.SqlClient.TdsParserStateObject.TryPrepareBuffer()
at System.Data.SqlClient.TdsParserStateObject.TryReadByteArray(Byte[] buff, Int32 offset, Int32 len, Int32& totalRead)
at System.Data.SqlClient.TdsParserStateObject.TryReadPlpBytes(Byte[]& buff, Int32 offst, Int32 len, Int32& totalBytesRead)
at System.Data.SqlClient.TdsParser.TryReadSqlValue(SqlBuffer value, SqlMetaDataPriv md, Int32 length, TdsParserStateObject stateObj)
at System.Data.SqlClient.SqlDataReader.TryReadColumnInternal(Int32 i, Boolean readHeaderOnly)
at System.Data.SqlClient.SqlDataReader.TryReadColumn(Int32 i, Boolean setTimeout, Boolean allowPartiallyReadColumn)
at System.Data.SqlClient.SqlDataReader.GetValueInternal(Int32 i)
at System.Data.SqlClient.SqlDataReader.GetValue(Int32 i)
at System.Data.SqlClient.SqlDataReader.get_Item(String name)
at LiveOR.Data.AccessLayer.LiveORDataManager.ImportWaveformDataLiveToWarehouse(Int32 livecaseid, Int32 warehousecaseid, String backupfilepath) in c:\SRC\LiveOR\LiveOR.Data\LiveORDataManager.cs:line 2416
at VisionSupport.Scheduler.Start() in c:\SRC\LiveOR\VisionSupport\Scheduler.cs:line 90

以上代码出现了OutOfMemoryException异常。以下是堆栈跟踪信息:

at System.Data.SqlClient.TdsParserStateObject.TryReadPlpBytes(Byte[]& buff, Int32 offst, Int32 len, Int32& totalBytesRead)
at System.Data.SqlClient.TdsParser.TryReadSqlValue(SqlBuffer value, SqlMetaDataPriv md, Int32 length, TdsParserStateObject stateObj)
at System.Data.SqlClient.SqlDataReader.TryReadColumnInternal(Int32 i, Boolean readHeaderOnly)
at System.Data.SqlClient.SqlDataReader.TryReadColumn(Int32 i, Boolean setTimeout, Boolean allowPartiallyReadColumn)
at System.Data.SqlClient.SqlDataReader.GetValueInternal(Int32 i)
at System.Data.SqlClient.SqlDataReader.GetValue(Int32 i)
at System.Data.SqlClient.SqlDataReader.get_Item(String name)
at LiveOR.Data.AccessLayer.LiveORDataManager.ImportWaveformDataLiveToWarehouse(Int32 livecaseid, Int32 warehousecaseid, String backupfilepath) in c:\SRC\LiveOR\LiveOR.Data\LiveORDataManager.cs:line 2419

编辑2:

这里是另一个随机的例子。我只需重新运行相同的代码即可获得这些。

行:

float[] wd = (float[])new BinaryFormatter().Deserialize(gZipStream);

异常:

SerializationException: Binary stream '75' does not contain a valid BinaryHeader. Possible causes are invalid stream or object version change between serialization and deserialization.

堆栈跟踪:

at System.Runtime.Serialization.Formatters.Binary.__BinaryParser.Run()
at System.Runtime.Serialization.Formatters.Binary.ObjectReader.Deserialize(HeaderHandler handler, __BinaryParser serParser, Boolean fCheck, Boolean isCrossAppDomain, IMethodCallMessage methodCallMessage)
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Deserialize(Stream serializationStream, HeaderHandler handler, Boolean fCheck, Boolean isCrossAppDomain, IMethodCallMessage methodCallMessage)
at System.Runtime.Serialization.Formatters.Binary.BinaryFormatter.Deserialize(Stream serializationStream)
at LiveOR.Data.AccessLayer.LiveORDataManager.ImportWaveformDataLiveToWarehouse(Int32 livecaseid, Int32 warehousecaseid, String backupfilepath) in c:\SRC\LiveOR\LiveOR.Data\LiveORDataManager.cs:line 2516

3
根据我的经验,Entity Framework 是必须阅读数据库并映射到对象的。但对于批量插入来说,它却不是必须的!这将花费很长时间。使用 ADO.NET(SqlBulkCopy)会更加高效。 - abatishchev
12
你是否考虑过使用SSIS来完成数据抽取、转换和加载(ETL)的工作?我不确定这是否适用于你,但这是一个可供选择的选项。 - maf748
3
请不要建议使用<Technology>,也不要使用它。你可能不知道它是什么,或者只是为了咨询赚钱。 - billinkc
4
话虽如此,由于value_array是varbinary(max)类型,我不确定SSIS是否适合这种情况。通常情况下,SSIS通过内存转换数据来提高性能。当源或目标的元数据发生更改时,数据类型之间的紧密耦合会让很多人遇到麻烦。因为这会对内存分配产生影响,所以SSIS并不会轻易放过这种情况。LOB类型通常会导致在内存中指向磁盘上临时文件的指针。但是使用SSIS时,如果出现了溢出到磁盘的情况,你就输了。 - billinkc
2
@abatishchev 一种建设性的反对意见看起来像billinkc留下的评论。 - maf748
显示剩余17条评论
3个回答

0

更简单的方法是使用Sqlbulkcopy和反射与实体框架:

首先使用分批获取2000/3000/5000条记录的方式来过滤数据;

然后使用反射获取一个datatable并将其传递给sqlbulkcopy,同时在事务中使用事务来防止问题。

记录每个事务,以便在失败时知道哪些记录未被导入。

继续这样做,直到完成任务。这将需要很短的时间。

以下是从实体检索datatable的示例=请注意,要传递给以下函数的对象列表是可枚举的,因此当您过滤数据并使用.ToList时,请不要忘记调用.asEnumerable,并使用类似于以下语句的语句:

  Lstads.Select(Function(x) x).AsEnumerable

因此,您可以将上一个查询的结果传递给此函数

   Public Function EQToDataTable(ByVal parIList As System.Collections.IEnumerable) As System.Data.DataTable
    Dim ret As New System.Data.DataTable()
    Try
        Dim ppi As System.Reflection.PropertyInfo() = Nothing
        If parIList Is Nothing Then Return ret
        For Each itm In parIList
            If ppi Is Nothing Then
                ppi = DirectCast(itm.[GetType](), System.Type).GetProperties()
                For Each pi As System.Reflection.PropertyInfo In ppi
                    Dim colType As System.Type = pi.PropertyType

                    If (colType.IsGenericType) AndAlso
                       (colType.GetGenericTypeDefinition() Is GetType(System.Nullable(Of ))) Then colType = colType.GetGenericArguments()(0)

                    ret.Columns.Add(New System.Data.DataColumn(pi.Name, colType))
                Next
            End If
            Dim dr As System.Data.DataRow = ret.NewRow
            For Each pi As System.Reflection.PropertyInfo In ppi
                dr(pi.Name) = If(pi.GetValue(itm, Nothing) Is Nothing, DBNull.Value, pi.GetValue(itm, Nothing))
            Next
            ret.Rows.Add(dr)
        Next
        For Each c As System.Data.DataColumn In ret.Columns
            c.ColumnName = c.ColumnName.Replace("_", " ")
        Next
    Catch ex As Exception
        ret = New System.Data.DataTable()
        Dim lg As New EADSCORE.Helpers.CustomLogger(False)
        lg.WriteLog(ex)
    End Try
    Return ret
End Function

这里有一个使用带事务的sqlbulkcopy的示例

   Public Sub BulkInserTest(ByVal list As System.Collections.IEnumerable)
    Dim hasElement = False
    For Each el In list
        hasElement = True
        Exit For
    Next
    If hasElement = True Then
        Dim dt As DataTable = EQToDataTable(list)

        Using cnn As New SqlClient.SqlConnection(ConfigurationManager.ConnectionStrings("BUCLCNN").ConnectionString)
            cnn.Open()
            Using tr As SqlClient.SqlTransaction = cnn.BeginTransaction
                Using sqlbulk As New SqlClient.SqlBulkCopy(cnn, SqlBulkCopyOptions.KeepIdentity, tr)
                    With sqlbulk
                        .DestinationTableName = "Ads"
                        .BatchSize = 2500
                        For Each el As DataColumn In dt.Columns
                            If el.ColumnName = "IDAds" Or el.ColumnName = "Province" Or el.ColumnName = "SubCategory" Or el.ColumnName = "AdsComments" Or el.ColumnName = "CarDetails" Or el.ColumnName = "HomeDetails" Or el.ColumnName = "Images" Or el.ColumnName = "Customer" Then
                                //not execute
                            Else
                                Dim map As New SqlBulkCopyColumnMapping(el.ColumnName, el.ColumnName)
                                .ColumnMappings.Add(map)
                            End If
                        Next
                        Try
                            If dt.Rows.Count > 0 Then
                                .WriteToServer(dt)
                                tr.Commit()
                            End If
                        Catch ex As Exception
                            tr.Rollback()
                            Dim lg As New EADSCORE.Helpers.CustomLogger(False)
                            lg.WriteLog(ex)
                        End Try
                    End With
                End Using
            End Using
            Dim cmd As New SqlCommand("Update Ads Set Article=replace(Article,'&amp;','&');Update Ads Set Title=replace(Article,'&amp;','&')", cnn)
            cmd.ExecuteNonQuery()
        End Using
    End If

End Sub

上面的代码必须进行修改,因为可能需要添加一些过滤器,例如 if 等等,但它也可以正常工作 :)
享受吧!
注意:我不知道你的实体类型是哪些,所以你必须检查映射以确保一切正常 :)
如果解决了你的问题,请将其标记为答案。

0

我原本想建议您没有关闭Reader,但现在我看到了Close()

这里有很多事情要做,肯定流是第一个需要查看的地方,但ADO.Net 4.5有一些新功能,让您按顺序逐个读取每行中的数据列而不缓冲它们,并且还可以读取字节数组而不将其缓冲在内存中。

值得一读。


0

尝试将var ms = new MemoryStream();放在using块中

请参阅有关MemoryStream的文档

MemoryStream

关闭当前流并释放与当前流相关联的任何资源(例如套接字和文件句柄)。而不是调用此方法,请确保正确处理流的释放。 (从Stream继承。)

Stream.Close

您可以在 using 块(或 Visual Basic 中的 Using 块)中声明 Stream 对象,以确保流及其所有资源都被处理,或者您可以显式调用 Dispose 方法。

我在使用块中包装了内存流。但是这并没有解决内存不足的异常问题。 - mdutra
值得一试。我会保留 using 块。 - paparazzo
我不认为这会解决问题,但你真的需要在整个过程中保留LiveORDataContext dc吗?我认为你可以在List<int> channelindexes之后关闭它。ToList()将强制处理它。 - paparazzo
你说得对。一旦我改变了代码,直接使用SqlDataReader,我忘记关闭数据上下文了。我之前的迭代中使用它来循环遍历记录... - mdutra
请注意,GZipStream具有一些非托管资源。http://msdn.microsoft.com/en-us/library/ms143502(v=vs.110).aspx 查找如何捕获非托管异常。尝试仅使用gZipStream进行测试并重新发送相同的数据。 - paparazzo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接