使用数据库表作为队列

56

我希望使用数据库表作为队列。我想按照插入的顺序(FIFO)在其中插入元素并从中取出元素。我的主要考虑因素是性能,因为每秒钟会有成千上万个此类事务。因此,我希望使用 SQL 查询来获取第一个元素,而无需搜索整个表。当我读取数据时,不会删除行。 使用 SELECT TOP 1 ….. 在这里有帮助吗? 我应该使用任何特殊的索引吗?


2
请参阅此文章,了解如何在SQL Server中实现队列的良好描述:http://www.mssqltips.com/sqlservertip/1257/processing-data-queues-in-sql-server-with-readpast-and-updlock/ - Matthew Murdoch
1
使用基于行的逻辑,例如在SQL Server中处理队列,是对资源的严重滥用。请使用SQL进行基于集合的逻辑处理。 - Erik Bergstedt
9个回答

48

我会使用一个IDENTITY字段作为主键,为每个排队的项目提供唯一递增的ID,并在其上放置聚集索引。这将表示排队项目的顺序。

为了在处理它们时保留项目在队列表中,您需要一个“状态”字段来指示特定项目的当前状态(例如,0=等待,1=正在处理,2=已处理)。这是为了防止对项目进行两次处理所必需的。

在处理队列中的项目时,您需要找到表中下一个目前未被处理的项目。这需要以一种方式进行,以避免多个进程同时选择相同的项目进行处理,如下所示。请注意,在实现队列时,需要了解UPDLOCK和READPAST这些表提示

例如,在sproc中,可以像这样:

DECLARE @NextID INTEGER

BEGIN TRANSACTION

-- Find the next queued item that is waiting to be processed
SELECT TOP 1 @NextID = ID
FROM MyQueueTable WITH (UPDLOCK, READPAST)
WHERE StateField = 0
ORDER BY ID ASC

-- if we've found one, mark it as being processed
IF @NextId IS NOT NULL
    UPDATE MyQueueTable SET Status = 1 WHERE ID = @NextId

COMMIT TRANSACTION

-- If we've got an item from the queue, return to whatever is going to process it
IF @NextId IS NOT NULL
    SELECT * FROM MyQueueTable WHERE ID = @NextID

如果处理一个项目失败,你想要之后能够再次尝试处理吗?如果是这样的话,你将需要将状态重置回0或其他值。这需要更多的思考。

或者,不要使用数据库表作为队列,而是使用像 MSMQ 这样的东西 - 只是我随便提出来的一个想法!


为什么我应该将select id与select *分开? - Shayan
你不必这样做,你可以在第一个SELECT的同时将所有需要的值加载到变量中,然后在最后返回它们。另外,我之前为了简单起见使用了"SELECT *",你只需返回实际需要的字段即可。 - AdaTheDev
我想将进程字段保留在另一个表中,并使用该表的外键来最小化程序不同部分的锁定效果。这种方法有帮助吗?我应该使用什么样的索引? - Shayan
2
您可以将队列表格仅用作排队机制,并将有关要处理的详细信息存储在与中央队列表不同的相关表中。如果要在处理过程中更新拆分出的字段,则该方法可以很好地工作。如果队列中有不同类型(模式)的消息,这种方法也很好用。 - AdaTheDev

9
如果您不删除已处理的行,则需要某种指示行已被处理的标志。
在该标志上放置索引,并在要排序的列上放置索引。
通过该标志对表进行分区,以便出队事务不会阻塞查询。
如果您每秒真的获得了1,000条消息,那么每天将产生8,640万行。您可能需要考虑一些清理旧行的方法。

通过“flag”,我指的是一些列来记住,如果一行已经被客户端处理过了。 - Peter Lang
我相信他的意思是你可以向你的表格中添加一列 - 也许是Dequeued - 来保存每个事务的状态。由于你不会在出队后删除行,所以你应该知道要忽略哪些事务。你可以将其设置为一个位字段,0表示排队,1表示出队。 - Waleed Al-Balooshi
然后根据该字段对表进行分区,以便出列的事务不会阻塞您的查询。 - David Schmitt
@David Schmitt:我把你的话放到我的回答中,因为我找不到更好的词语。希望你不介意... - Peter Lang
“什么是标志(flag)”这个问题完全取决于上下文。在关系数据库设计的背景下,“flag”是一个四字词。 - Craig Tullis

6

一切都取决于您的数据库引擎/实现。

对我来说,简单的队列在具有以下列的表上:

id / task / priority / date_added

通常有效。

我使用优先级和任务来分组任务,在出现重复任务的情况下,我选择优先级更高的那个。

不用担心 - 对于现代数据库来说,“数千条”并不算特别多。


这些是什么? 我使用 SQL Server 2008。 - Shayan

3
只要使用某种方法跟踪插入的日期时间,这将不会有任何问题。请参见此处的mysql options。问题是您是否只需要最近提交的项目或者需要迭代。如果需要迭代,则需要使用带有ORDER BY语句的块,循环并记住上一个日期时间,以便在获取下一个块时使用。

2
也许在您的选择语句中添加 LIMIT=1 可以帮助您... 强制在单个匹配后返回...

TOP 1 有什么不同? - Shayan
我知道SQL Server使用TOP 1与Postgres中的LIMIT 1是相同的。我想其他所有供应商都会接受其中之一。 - Matt
1
老实说,我没意识到它们是等效的...我从来没有使用过TOP语法,只用LIMIT...这就是为什么我喜欢StackOverflow:即使在提供答案时,我也会学到新的东西。 - Reed Debaets

2
创建一个基于日期(或自增)列的聚集索引。这将使表中的行大致保持索引顺序,并允许在对索引列进行ORDER BY时快速基于索引访问。然后,使用TOP X(或LIMIT X,具体取决于你的RDMBS)仅检索索引中的前x项。
性能警告:你应该始终查看查询的执行计划(在真实数据上),以验证优化器不会做出意外的操作。同时,尝试在真实数据上测试你的查询,以便能够做出明智的决策。

2

由于您不从表中删除记录,因此需要在(processed, id)上创建一个组合索引,其中processed是指示当前记录是否已处理的列。

最好的方法是为记录创建一个分区表,并将PROCESSED字段设置为分区键。这样,您可以保留三个或更多本地索引。

然而,如果您总是按照id顺序处理记录,并且只有两种状态,则更新记录意味着只需将记录从索引的第一个叶子节点取出并附加到最后一个叶子节点。

当前处理的记录始终具有所有未处理记录中最小的id和所有已处理记录中最大的id


我想将进程字段存储在另一张表中,并使用该表的外键来最小化程序不同部分之间的锁定效应。 - Shayan
4
“@Shayan”,这将严重影响您的查询性能。而且在处理时,您需要锁定该字段。 - Quassnoi

1
我有一个问题,“如何将表格转换为队列”,但是无论在哪里都找不到所需答案。
以下是我为Node/SQLite/better-sqlite3提出的解决方案。基本上只需根据您的用例修改内部的WHEREORDER BY子句即可。
module.exports.pickBatchInstructions = (db, batchSize) => {
  const buf = crypto.randomBytes(8); // Create a unique batch identifier

  const q_pickBatch = `
    UPDATE
      instructions
    SET
      status = '${status.INSTRUCTION_INPROGRESS}',  
      run_id = '${buf.toString("hex")}',
      mdate = datetime(datetime(), 'localtime')
    WHERE
      id IN (SELECT id 
        FROM instructions 
        WHERE 
          status is not '${status.INSTRUCTION_COMPLETE}'
          and run_id is null
        ORDER BY
          length(targetpath), id
        LIMIT ${batchSize});
  `;
  db.run(q_pickBatch); // Change the status and set the run id

  const q_getInstructions = `
    SELECT
      *
    FROM
      instructions
    WHERE
      run_id = '${buf.toString("hex")}'
  `;
  const rows = db.all(q_getInstructions); // Get all rows with this batch id

  return rows;
};

0

针对此问题,一个非常简单的解决方案是使用更改跟踪机制(而不是数据捕获),以避免事务、锁等。它利用版本控制来跟踪每个添加/更新/删除的行,因此您可以在特定版本之后跟踪发生的更改。

因此,您需要持久化最新版本并查询新的更改。

如果查询失败,您可以随时返回并查询上一个版本的数据。 此外,如果您不想通过一次查询获取所有更改,您可以按照最新版本的顺序获取前n个,并存储最大版本ID,以便下次再次查询。

例如,请参见在SQL Server 2008中使用更改跟踪


改变跟踪如何帮助您将数据库表用作队列?在队列中,您想获取下一个可用任务(按FIFO顺序),该任务尚未被处理,并确保该项仅被处理一次。更改跟踪解决了完全不同的问题-自上次查询以来哪些行已更改。我没有看到联系。 - Brian Rogers
好观点Brian,你是对的。我提出了更改跟踪,以便根本不需要表队列。这就是我的观点。某人可以使用更改跟踪机制直接从源表中获取更改,而无需使用触发器(可能)或其他东西来填充队列,只要他想跟踪更改即可。感谢您的评论。 - George Mavritsakis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接