使用SQL Server作为具有多个客户端的数据库队列

43

给定一个充当队列的表,如何最好地配置表/查询,以便多个客户端可以同时从队列中处理?

例如,下面的表格表示工作人员必须处理的命令。 当工作人员完成后,它将将processed值设置为true。

| ID | COMMAND | PROCESSED |
|  1 | ...     | true      |
|  2 | ...     | false     |
|  3 | ...     | false     |

客户端可能会这样获取一个要操作的命令:

select top 1 COMMAND 
from EXAMPLE_TABLE 
with (UPDLOCK, ROWLOCK) 
where PROCESSED=false;

然而,如果有多个工作进程,每个进程都试图获取ID为2的行。只有第一个会获得悲观锁,其余的将等待。然后其中一个进程将获取第三行,依此类推。

哪种查询/配置可以让每个工作进程分别获得不同的行并同时处理它们?

编辑:

有几个答案建议变体使用表本身记录正在进行的状态。我认为这在单个事务中是不可能的。(即,如果没有其他工作者在提交事务之前看到它,更新状态有什么意义呢?)也许建议是:

# start transaction
update to 'processing'
# end transaction
# start transaction
process the command
update to 'processed'
# end transaction

人们通常是这样处理这个问题的吗?我觉得如果可能的话,问题最好由数据库来处理。


3
请指出原始来源,因为SO没有推荐重复内容。 - Synesso
2
为什么要费尽心思重新创建所有这些功能,当它们已经以 Microsoft Message Queueing (MSMQ) 的形式可用于任何 Windows 服务器上?使用现有的资源-不要不断地重复发明轮子! - marc_s
3
我不同意marc_s的观点,MSMQ给系统管理员和开发人员带来了重大的复杂性。由于无法处理其复杂性,百万美元项目已经失败。除非有充分的理由,否则不应该引入MSMQ。 - Andomar
9
将你的排队操作分离到一个不同的事务管理器中,意味着每个操作都需要进行两阶段提交来协调 SQL-MSMQ(即每秒几十/低百个操作与单阶段提交的数以万计的操作相比)。将消息存储在 MSMQ 中,状态存储在数据库中意味着你根本无法进行一致的备份。使用 MSMQ 会导致消息查询性能下降。最后,MSMQ 每个存储的限制为2GB,这在今天非常小,你很容易就会用完 MSMQ 存储空间,此时它就会翻转并死亡。 - Remus Rusanu
@Remus Rusanu:感谢您提供这些有趣的见解——这些确实是 MSMQ 的一些严重缺点...... - marc_s
谢谢。这是一个供应商系统。它已经将数据库用作队列,我只是试图使其并发。我们的应用程序中已经有IBM MQSeries,但由于现有系统设计的限制,无法将其用于处理这些消息。 - Synesso
7个回答

71

我建议你参考使用表作为队列。正确实现的队列可以处理数千个并发用户,并支持每分钟高达1/2百万的入队/出队操作。在SQL Server 2005之前,解决方案很麻烦,需要在单个事务中混合使用SELECTUPDATE,并给出恰当的锁提示,如gbn所链接的文章中所述。幸运的是,自从SQL Server 2005引入了OUTPUT子句以来,就有了一种更加优雅的解决方案,MSDN现在建议使用OUTPUT子句

您可以在将表用作队列或保存中间结果集的应用程序中使用OUTPUT。也就是说,应用程序不断地向表中添加或删除行。

基本上,为了使这种方法高度并发工作,您需要正确解决以下三个问题:

  1. 您需要自动出队。您必须找到行,跳过任何锁定行,并在一个单一的原子操作中将其标记为“已出队列”,而这就是OUTPUT子句发挥作用的地方:
    with CTE as (
      SELECT TOP(1) COMMAND, PROCESSED
      FROM TABLE WITH (READPAST)
      WHERE PROCESSED = 0)
    UPDATE CTE
      SET PROCESSED = 1
      OUTPUT INSERTED.*;
  1. 必须将表结构设计为最左侧的聚集索引键位于PROCESSED列上。如果ID被用作主键,则将其移至聚集键中的第二个列。对于是否在ID列上保留非聚集键的争论仍在继续,但我强烈建议不要在队列上使用任何次要的非聚集索引:
    CREATE CLUSTERED INDEX cdxTable on TABLE(PROCESSED, ID);
  1. 你必须只能通过Dequeue来查询这个表,不要尝试使用Peek操作或将该表同时用作队列 存储,否则很可能会导致死锁并大大降低吞吐量。

原子出队、使用READPAST提示搜索要出队的元素以及在处理比特上基于聚集索引的最左侧键的组合,在高度并发的负载下保证非常高的吞吐量。


1
那篇关于“使用表格作为队列”的文章对于我们这些需要实现不适用于服务代理的队列(例如挂起队列)的人来说是纯金。感谢你,伙计。 - Nick Chammas
@RemusRusanu 两个问题 - 为什么要使用CTE,我能否不使用Top(1)来获取所有待处理项? - Uri Abramson
1
正确实现的队列可以处理数千个并发用户,并且每分钟可以处理高达1/2百万个入队/出队操作。这个声明有来源吗? - Dan Ling
@RemusRusanu 老帖子了,但我想知道为什么这里没有推荐 Service broker queue 作为替代方案,因为它不适合并发处理? - ibubi
@RemusRusanu 在 Where 子句中如何处理多个条件? - Paul
显示剩余6条评论

9

锁定提示对于保护选择和更新之间的更改非常有用。如果您使用单个更新语句弹出队列,则不需要或有用。 - Andomar
@Andomar:需要的是这个:读者和写者之间100%安全的并发性... - gbn

1
如果您想为多个客户端序列化操作,可以简单地使用应用程序锁。
BEGIN TRANSACTION

EXEC  sp_getapplock @resource = 'app_token', @lockMode = 'Exclusive'

-- perform operation

EXEC  sp_releaseapplock @resource = 'app_token'

COMMIT TRANSACTION

1

我建议不要在表中使用锁。只需创建两个额外的列,如IsProcessing(位/布尔)和ProcessingStarted(日期时间)。当工作进程崩溃或超时后未更新其行时,您可以让另一个工作进程尝试处理数据。


谢谢。请看我的编辑。这个解决方案需要在主事务之外进行初始更新吗? - Synesso
为什么要使用事务? - ZippyV

0

不必使用布尔值来表示已处理,而可以使用整数来定义命令的状态:

1 = not processed
2 = in progress
3 = complete

每个工作进程将获得下一行 Processed = 1 的数据,将 Processed 更新为2并开始工作。当工作完成时,Processed 将更新为3。这种方法还允许扩展其他处理结果,例如,除了定义工作者已经完成之外,您可以添加新的状态如“成功完成”和“带错误完成”。

谢谢。请看我的编辑。你的建议我理解错了吗? - Synesso
你是正确的,你需要单独的事务来让其他工作人员看到更新,这应该是默认行为 - 为什么在工作人员处理命令时要保持事务开放?我可以看出工作人员本身本质上就是一个事务,但是自己编写代码几乎肯定比使用Sql Server事务更好。 - Macros
@宏定义 有什么建议,如何使用它来处理“peek lock”? - Nuri YILMAZ

0

可能更好的选择是使用一个三态处理列和一个版本/时间戳列。处理列中的三个值将指示行是否正在处理、已处理或未处理。

例如:

    CREATE TABLE Queue ID INT NOT NULL PRIMARY KEY,
    Command NVARCHAR(100), 
    Processed INT NOT NULL CHECK (Processed in (0,1,2) ), 
    Version timestamp)

你获取顶部的1个未处理行,将状态设置为正在处理,并在完成后将状态设置回已处理。根据版本和主键列更新状态。如果更新失败,则表示已经有人在处理。

你可能还想添加客户端标识符,以便如果客户端在处理时死机,它可以重新启动,查看最后一行,然后从上次中断的地方开始。


谢谢。请看我的编辑。另外,为了持续的可用性,我希望任何可用的客户端都能恢复失败的作业 - 而不仅仅是失败的那个。 - Synesso

0

一种方法是使用单个更新语句标记行。 如果在where子句中读取状态并在set子句中更改状态,则没有其他进程可以在其间插入,因为该行将被锁定。 例如:

declare @pickup_id int
set @pickup_id = 1

set rowcount 1

update  YourTable
set     status = 'picked up'
,       @pickup_id = id
where   status = 'new'

set rowcount 0

return @pickup_id

这里使用 rowcount 来更新至多一行。如果没有找到任何行,则 @pickup_id 将会是 -1


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接