C#最快的将数据插入SQL数据库的方法

9
我正在从外部源(通过Lightstreamer)接收(流式)数据到我的C#应用程序中。我的C#应用程序从监听器接收数据。来自监听器的数据存储在队列(ConcurrentQueue)中。队列每0.5秒进行一次清理,使用TryDequeue将数据复制到DataTable中。然后使用SqlBulkCopy将DataTable复制到SQL数据库中。SQL数据库将从暂存表中到达的新数据处理到最终表中。我目前每天接收约30万行数据(未来几周内可能会大幅增加),我的目标是保持从接收数据到它们在最终SQL表中可用的时间不超过1秒。目前,我每秒需要处理的最大行数约为50行。
不幸的是,随着越来越多的数据,我的逻辑性能变得越来越慢(仍远低于1秒,但我想继续改进)。主要瓶颈(到目前为止)是在SQL数据库上处理临时数据(到最终表格)。为了提高性能,我想将临时表切换为内存优化表。最终表已经是内存优化表,所以它们肯定可以很好地协同工作。
我的问题:
1.是否有办法使用内存优化表(在C#之外)使用SqlBulkCopy?(据我所知,目前还没有办法) 2.有没有建议以最快的方式将从我的C#应用程序接收到的数据写入内存优化的临时表中?
编辑(解决方案):
经过评论/答案和性能评估,我决定放弃批量插入,并使用SQLCommand将一个IEnumerable作为表值参数传递到本地编译的存储过程中,直接将数据存储在我的内存优化最终表中(以及复制到“临时”表中,现在它作为存档)。性能显着提高(即使我还没有考虑并行插入(将在稍后阶段进行))。以下是部分代码:
内存优化用户定义的表类型(将数据从C#传递到SQL(存储过程)):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
    [CityIndexInstrumentID] [int] NOT NULL,
    [CityIndexTimeStamp] [bigint] NOT NULL,
    [BidPrice] [numeric](18, 8) NOT NULL,
    [AskPrice] [numeric](18, 8) NOT NULL,
    INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED 
(
    [CityIndexInstrumentID] ASC,
    [CityIndexTimeStamp] ASC,
    [BidPrice] ASC,
    [AskPrice] ASC
)
)
WITH ( MEMORY_OPTIMIZED = ON )

本地编译的存储过程用于将数据插入最终表和分层(在此情况下作为归档使用):

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
    @ProcessingID int,
    @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as 
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')


    -- store prices

    insert into TimeSeries.CityIndexIntradayLivePrices
    (
        ObjectID, 
        PerDateTime, 
        BidPrice, 
        AskPrice, 
        ProcessingID
    )
    select Objects.ObjectID,
    CityIndexTimeStamp,
    CityIndexIntradayLivePricesStaging.BidPrice, 
    CityIndexIntradayLivePricesStaging.AskPrice,
    @ProcessingID
    from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
    Objects.Objects
    where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID


    -- store data in staging table

    insert into Staging.CityIndexIntradayLivePricesStaging
    (
        ImportProcessingID,
        CityIndexInstrumentID,
        CityIndexTimeStamp,
        BidPrice,
        AskPrice
    )
    select @ProcessingID,
    CityIndexInstrumentID,
    CityIndexTimeStamp,
    BidPrice,
    AskPrice
    from @CityIndexIntradayLivePrices


end

一个包含从队列中获取的元素的IEnumerable:

private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{


    // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)

    SqlMetaData MetaDataCol1;
    SqlMetaData MetaDataCol2;
    SqlMetaData MetaDataCol3;
    SqlMetaData MetaDataCol4;

    MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
    MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
    MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
    MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale


    // define sql data record with the columns

    SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });


    // remove each price row from queue and add it to the sql data record

    LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();

    while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
    {

        DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
        DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
        DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
        DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price

        yield return DataRecord;

    }


}

每0.5秒处理一次数据:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{


    try
    {

        // open new sql connection

        using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
        {


            // open connection

            TimeSeriesDatabaseSQLConnection.Open();


            // endless loop to keep thread alive

            while(true)
            {


                // ensure queue has rows to process (otherwise no need to continue)

                if(IntradayQuotesQueue.Count > 0) 
                {


                    // define stored procedure for sql command

                    SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);


                    // set command type to stored procedure

                    InsertCommand.CommandType = CommandType.StoredProcedure;


                    // define sql parameters (table-value parameter gets data from CreateSqlDataRecords())

                    SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
                    SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter


                    // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)

                    ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;


                    // execute stored procedure

                    InsertCommand.ExecuteNonQuery();


                }


                // wait 0.5 seconds

                Thread.Sleep(500);


            }

        }

    }
    catch (Exception e)
    {

        // handle error (standard error messages and update processing)

        ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);

    };


}

看看 TVP(表值参数)- 您可以将其用作反向 DataReader。 https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ - paparazzo
3个回答

4

使用 SQL Server 2016(尽管它还没有发布RTM版本,但在内存优化表方面已经比2014年的版本好得多)。然后根据您的情况,使用内存优化表变量或者在事务中大量使用本地存储过程调用进行插入操作,具体取决于哪种方式更快。需要注意以下几点:

  • 在一个事务中执行多个插入操作对于节省网络往返非常重要。虽然内存操作非常快,但 SQL Server 仍然需要确认每个操作。
  • 根据您生成数据的方式,您可能会发现并行插入可以加速操作(不要过度使用;您很快就会达到饱和点)。在这里不要尝试自己变得非常聪明;利用 async/await 和/或 Parallel.ForEach
  • 如果您正在传递一个表值参数,最简单的方法是将 DataTable 作为参数值传递,但这不是最有效的方法--最有效的方法是传递一个 IEnumerable<SqlDataRecord>。您可以使用迭代器方法来生成值,因此只分配了恒定数量的内存。

您需要进行一些实验以找到通过数据的最佳方式;这在很大程度上取决于您的数据大小和获取方式。


我目前正在使用SQL Server 2014,并且已经成功地实现了我的解决方案(尽管我不得不做出一些小的妥协)。但是我会尽快考虑SQL Server 2016。IEnumerable<SqlDataRecord>非常好用,比DataTable更快。使用内存优化的表值参数和本地编译的存储过程大大减轻了数据库的工作负担。 - Reboon

1

将暂存表中的数据批量插入到最终表中,每次插入不超过5k行数据,通常我使用4k行,并且不要在事务中插入它们。如果需要,可以实现编程事务。保持插入的行数少于5k可以避免行锁升级为表锁,这需要等待其他人退出该表。


1

你确定是逻辑在减慢速度而不是实际对数据库的交易吗?例如,Entity Framework 在尝试插入大量行时会变得“敏感”,缺乏更好的术语,并且变得相当缓慢。

有一个第三方库BulkInsert,在Codeplex上,我用过它,非常适合批量插入数据:https://efbulkinsert.codeplex.com/

你也可以为DBContext编写自己的扩展方法,如果你使用EF,可以基于记录计数来执行此操作。 任何小于5000行的内容都使用Save(),超过这个数量,您可以调用自己的批量插入逻辑。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接