从C#到SQL Server的批量插入策略

7
在我们当前的项目中,客户将向我们的系统发送一组复杂/嵌套的消息。这些消息的频率大约是每秒1000-2000条。
这些复杂的对象包含交易数据(待添加)以及主数据(如果未找到,则将其添加)。但是,客户传递的是“名称”列,而不是主数据的id。
系统检查是否存在这些名称的主数据。如果找到了,它将使用数据库中的id,否则首先创建此主数据,然后再使用这些id。
一旦解析了主数据id,系统就会将事务数据插入SQL Server数据库(使用主数据id)。每个消息中的主实体数量约为15-20个。
以下是我们可以采取的一些策略。
1. 我们可以先从我们的C#代码中解析主ID(如果未找到,则插入主数据),并将这些ID存储在C#缓存中。一旦解析了所有ID,我们就可以使用SqlBulkCopy类批量插入事务数据。我们可以命中数据库15次以获取不同实体的ID,然后再命中数据库一次以插入最终数据。我们可以使用相同的连接,在完成所有这些处理后关闭它。
2. 我们可以将所有包含主数据和事务数据的消息一次性发送到数据库(以多个TVP的形式),然后在存储过程内部为缺失的主数据创建主数据,然后插入事务数据。
有人能建议在这种情况下采用最佳方法吗?
由于某些隐私问题,我无法分享实际对象结构。但是,这里是一个假设的对象结构,非常接近我们的业务对象。
这样的消息将包含有关一个产品(其主数据)以及来自不同供应商的价格详细信息(交易数据)的信息:
主数据(如果未找到,则需要添加)
产品名称:ABC,产品类别:XYZ,制造商:XXX和一些其他详细信息(属性数量在15-20范围内)。
交易数据(始终会添加)
供应商名称:A,列出价格:XXX,折扣:XXX
供应商名称:B,列出价格:XXX,折扣:XXX
供应商名称:C,列出价格:XXX,折扣:XXX
供应商名称:D,列出价格:XXX,折扣:XXX
对于属于一个产品的消息,大部分关于主数据的信息都将保持不变(并且变化较少),但交易数据将始终波动。因此,系统将检查产品“XXX”是否存在于系统中。如果没有,则检查此产品所述的“类别”是否存在。如果没有,它将插入一个新记录以获取类别,然后为产品获取记录。这将用于制造商和其他主数据。

多个供应商将同时发送有关多个产品(2000-5000)的数据。

假设我们有1000个供应商,每个供应商都会发送有关10-15种不同产品的数据。每隔2-3秒钟,每个供应商都会向我们发送这10个产品的价格更新。他可能开始发送新产品的数据,但这并不会非常频繁。


哦,最后,每秒2000条消息是很多的。真的很多。这意味着平均处理时间需要保持在每个消息500微秒以下。考虑到I/O延迟,即使使用批量复制,你可能会看到并行批量插入。而且,任何想要从数据库中获取的数据进行查找的数据?你需要将其缓存在内存中。这可能比将其保存在数据库端更好,也可能不如此,这取决于情况。 - Jeroen Mostert
2
如果你有两匹马,想知道哪一匹更快,那么就让它们比赛吧。详见Eric Lippert的文章《Which is faster?》(http://ericlippert.com/2012/12/17/performance-rant/)。 - marc_s
SqlBulkCopy在少量行的情况下非常慢。您可以放弃该方法,转而使用TVP方法,从性能和代码质量的角度来看,TVP方法非常好。您能否将多个传入的传输批处理到一个数据库事务中?这是我看到的唯一提高性能的杠杆。另外,Hekaton怎么样? - usr
@usr,我们将仅针对我们的交易数据使用sqlbulkcopy,该数据的行数将在1000行至25000行的范围内。 - Pragmatic
这是一个适合批量复制的好案例。为什么问题中没有提供这些信息呢?请明确您期望的数据量。 - usr
显示剩余3条评论
2个回答

2
你最好采用方案二 (即使用多个TVP一次性将15-20个实体发送到数据库,并作为2000个消息的整个集合进行处理)。
在应用层缓存主数据查找并在发送到数据库之前进行转换听起来不错,但是却忽略了以下几点:
1. 无论如何,您都需要访问数据库来获取初始列表。 2. 无论如何,您都需要访问数据库来插入新条目。 3. 查找字典中的值以替换ID恰好就是数据库所做的事情(假设每个这些名称到ID查找都有一个非聚集索引)。 4. 频繁查询的值将在缓冲池中缓存它们的数据页(这是一个内存缓存)。
为什么要在应用程序层面上重复现在已经由数据库层面提供和发生的内容,尤其在考虑到以下事实时:
1. 15-20个实体可能有多达20k条记录(这是相对较少的数字,特别是考虑到非聚集索引只需要两个字段:NameID,当使用100%填充因子时,可以将许多行打包到单个数据页中)。 2. 并不是所有的20k条目都是“活动”或“当前”的,因此您无需担心缓存所有条目。因此,任何当前值都将很容易地被确定为正在查询的值,并且那些数据页(可能包括一些非活动条目,但在那里没有大问题)将被缓存在缓冲池中。
因此,您不需要担心老旧条目的老化或由于可能更改的值(即特定ID的更新Name)而强制执行任何关键过期或重新加载,因为这是自然处理的。
是的,在内存中缓存是美妙的技术,大大加快了网站的速度,但这些情景/用例是为仅具有纯只读目的的非数据库进程请求相同的数据而设计的。但是,这种情况是合并数据且查找值列表可能经常发生变化(更多是由于新条目而不是由于更新条目)的情况之一。
综上所述,选项2是最佳选择。我已经成功地使用了这种技术几次,虽然没有使用15个TVP。也许需要对该方法进行一些优化/调整以调整此特定情况,但我发现以下方法效果很好:
  • Accept the data via TVP. I prefer this over SqlBulkCopy because:
    • it makes for an easily self-contained Stored Procedure
    • it fits very nicely into the app code to fully stream the collection(s) to the DB without needing to copy the collection(s) to a DataTable first, which is duplicating the collection, which is wasting CPU and memory. This requires that you create a method per each collection that returns IEnumerable<SqlDataRecord>, accepts the collection as input, and uses yield return; to send each record in the for or foreach loop.
  • TVPs are not great for statistics and hence not great for JOINing to (though this can be mitigated by using a TOP (@RecordCount) in the queries), but you don't need to worry about that anyway since they are only used to populate the real tables with any missing values
  • Step 1: Insert missing Names for each entity. Remember that there should be a NonClustered Index on the [Name] field for each entity, and assuming that the ID is the Clustered Index, that value will naturally be a part of the index, hence [Name] only will provide a covering index in addition to helping the following operation. And also remember that any prior executions for this client (i.e. roughly the same entity values) will cause the data pages for these indexes to remain cached in the Buffer Pool (i.e. memory).

    ;WITH cte AS
    (
      SELECT DISTINCT tmp.[Name]
      FROM   @EntityNumeroUno tmp
    )
    INSERT INTO EntityNumeroUno ([Name])
      SELECT cte.[Name]
      FROM   cte
      WHERE  NOT EXISTS(
                     SELECT *
                     FROM   EntityNumeroUno tab
                     WHERE  tab.[Name] = cte.[Name]
                       )
    
  • Step 2: INSERT all of the "messages" in simple INSERT...SELECT where the data pages for the lookup tables (i.e. the "entities") are already cached in the Buffer Pool due to Step 1

最后,记住,假设/推测/有根据的猜测无法替代测试。您需要尝试几种方法,看看哪种方法最适合您的特定情况,因为可能会有未分享的其他细节会影响到这里被认为是“理想”的内容。
如果消息只是插入操作,则 Vlad 的想法可能更快。我在这里描述的方法是在更复杂且需要完全同步(更新和删除)的情况下使用的,并进行了额外的验证和相关操作数据的创建(不是查找值)。在仅进行插入(仅 2000 条记录的情况下,我怀疑是否有很大的区别),使用 SqlBulkCopy 可能更快,但这假设您直接将记录加载到目标表(消息和查找表)中,而不是中间表/暂存表(我认为 Vlad 的想法是直接将 SqlBulkCopy 复制到目标表中)。但是,如上所述,使用外部缓存(即不是缓存池)也更容易出现错误,因为存在更新查找值的问题。如果使用外部缓存仅略微更快,则需要编写更多代码来解决外部缓存失效的问题,这可能不值得。必须考虑这种额外风险/维护工作,以确定哪种方法对您的需求更为全面。
更新:
根据评论中提供的信息,我们现在知道:
- 有多个供应商 - 每个供应商提供多种产品 - 产品不是唯一属于供应商的;1个或多个供应商销售产品 - 产品属性是单数形式的 - 定价信息具有可以具有多条记录的属性 - 定价信息仅限于“插入”操作(即时点历史) - 根据 SKU(或类似字段)确定唯一产品 - 一旦创建,对于通过具有不同属性的现有 SKU 提交的产品(例如类别、制造商等),将被视为相同的产品;差异将被忽略
考虑到所有这些,我仍然建议使用 TVP,但重新考虑方法,并使其以供应商为中心,而不是以产品为中心。这里的假设是供应商随时发送文件。因此,在获取文件时导入它。您事先进行的唯一查找是供应商,以下是基本布局:
  1. 可以合理地假设在此时您已经拥有了VendorID,因为系统为什么会从一个未知的来源导入文件呢?
  2. 您可以分批导入
  3. 创建一个SendRows方法:
    • 接受一个FileStream或允许浏览文件的东西
    • 接受类似于int BatchSize的东西
    • 返回IEnumerable<SqlDataRecord>
    • 创建一个SqlDataRecord以匹配TVP结构
    • 循环FileStream,直到BatchSize满足或文件中没有更多记录
    • 对数据执行任何必要的验证
    • 将数据映射到SqlDataRecord
    • 调用yield return;
  4. 打开文件
  5. 当文件中有数据时
    • 调用存储过程
    • 传递VendorID
    • 传递SendRows(FileStream, BatchSize)作为TVP
  6. 关闭文件
  7. 尝试以下操作:
    • 在围绕FileStream的循环之前打开SqlConnection,在循环结束后关闭它
    • 在FileStream循环内部打开SqlConnection,执行存储过程,并在其中关闭SqlConnection
  8. 尝试不同的BatchSize值。从100开始,然后是200、500等。
  9. 存储过程将处理插入新产品的工作

使用这种结构,您将发送未使用的产品属性(即仅使用SKU用于查找现有产品)。但是,它可以很好地扩展,因为没有关于文件大小的上限。如果供应商发送50个产品,那就好。如果他们发送了50k个产品,也可以。如果他们发送了400万个产品(这是我所使用的系统,并且它确实处理了与其任何属性不同的产品信息的更新!),那也可以。在应用程序层或DB层中没有增加内存来处理甚至1000万个产品。导入所需的时间应该随着发送的产品数量而增加。


更新2
与源数据相关的新细节:

  • 来自Azure EventHub
  • 以C#对象形式出现(没有文件)
  • 产品详细信息通过O.P.系统的API传递
  • 在单个队列中收集(只需拉出数据插入到数据库中)

如果数据源是C#对象,则我绝对会使用TVP,因为您可以按照我在第一个更新中描述的方法将它们原样发送(即返回IEnumerable<SqlDataRecord>的方法)。为每个供应商详细信息发送一个或多个TVP,但对于单个属性属性,请使用常规输入参数。例如:

CREATE PROCEDURE dbo.ImportProduct
(
  @SKU             VARCHAR(50),
  @ProductName     NVARCHAR(100),
  @Manufacturer    NVARCHAR(100),
  @Category        NVARCHAR(300),
  @VendorPrices    dbo.VendorPrices READONLY,
  @DiscountCoupons dbo.DiscountCoupons READONLY
)
SET NOCOUNT ON;

-- Insert Product if it doesn't already exist
IF (NOT EXISTS(
         SELECT  *
         FROM    dbo.Products pr
         WHERE   pr.SKU = @SKU
              )
   )
BEGIN
  INSERT INTO dbo.Products (SKU, ProductName, Manufacturer, Category, ...)
  VALUES (@SKU, @ProductName, @Manufacturer, @Category, ...);
END;

...INSERT data from TVPs
-- might need OPTION (RECOMPILE) per each TVP query to ensure proper estimated rows

感谢srutzky的建议。您是正确的,这不是缓存的理想用例,因为新实体将继续添加到缓存中(但是经过一段时间后,我们相信数据大部分时间将从缓存中提供)。但是是的,我们将不得不使缓存失效(因为主数据可能会更名)。当然,如果数据库内部缓存可以实现此目的,我们可以摆脱一个批量插入的多个数据库访问。我将检查这种方法并更新您。 - Pragmatic
@Pragmatic 谢谢,非常有帮助。这不完全是我认为你在谈论的内容。所以对于每个“消息”,每个被视为“主数据”的“属性”只存在一次,对吗?一个单独的“产品”,位于一个单独的“类别”,由一个单独的“制造商”制造。基本上,每个“消息”都是一个SKU,对吗?如果您按供应商获取数据,为什么要将其按SKU发送到DB?为什么不在供应商发送时仅发送每组SKU?顺便说一句,这与我多年工作并在我的答案中描述的复杂同步系统几乎相同;-) - Solomon Rutzky
@Pragmatic 另外:如果多个供应商发送 SKU 信息,当他们的数据可能存在错误时,如何确定 SKU/产品在供应商之间是相同的?制造商、类别等拼写差异如何处理?另外,如果产品列表大多保持一致,而价格列表经常更改,则这与最初提出的 SqlBulkCopy 的问题略有不同;真正需要解决的问题是同步价格列表,因为这不是一个简单的插入操作:您需要更新大部分但插入一些。 - Solomon Rutzky
您说得对Vlidirmir,sqlBulkCopy仅适用于完全准备好的数据。在这里再添加一条注释。如果我们使用TVP,我们将不得不像产品名称,类别名称等一样向数据库发送大量(重复的)数据。每个消息都需要映射消息和主数据之间的关系,因此无法从c#发送不同的值到数据库。 - Pragmatic
@srutzky,非常感谢您的帮助。我们尝试了第一种方法(即在c#代码中缓存主数据id)。这些id是通过调用不同的存储过程,并传递主数据TVPs来获取的。现在我们将尝试第二种方法,即将所有TVP(主数据和TVP数据)一起传递到数据库中,并在那里解析Ids(由您建议)。我们会向您更新结果。 - Pragmatic
显示剩余9条评论

0
从数据库的角度来看,没有比BULK INSERT(例如从csv文件中)更快的东西了。最好的方法是尽快批量处理所有数据,然后使用存储过程进行处理。
C#层只会减慢进程,因为C#和SQL之间的所有查询都比Sql-Server直接处理的速度慢数千倍。

但是,如果在C#中进行一些小的预处理,可以直接批量复制表格,而无需创建临时表并处理它,那么您的C#代码可能会受到欢迎,因此情况并不完全清晰。 (顺便说一下,使用C#和SqlBulkCopy肯定会击败中间CSV文件。) - Jeroen Mostert
@Sharped,我们无法移除C#层。C#用于向用户公开终端点。 - Pragmatic
“首先从我们的C#代码中解析主ID(如果找不到则插入主数据)”意味着在C#和SQL之间进行查询。存在网络延迟、查询处理和C#代码等问题... 如果您要处理2000条消息/秒,那么您将会遇到问题。因此,最好的方法是直接将所有数据加载到数据库中(使用SqlBulkCopy或BULK INSERT,如果您有CSV格式的数据),然后直接在SQL中实现所需的所有逻辑。没有更快的选项。 - rducom
只想再添加一件事。一旦客户在本地启动了一个进程,他会一段时间内使用相同的主数据发送消息。这意味着,如果我们可以缓存主数据(ID和名称),我们就不必一遍又一遍地解析这些ID。 - Pragmatic
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Jeroen Mostert
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接