将DB表作为作业队列(又称批量队列或消息队列)的最佳方法

25

我有一张包含约50K行数据的数据库表,每行代表一个需要完成的任务。我有一个程序从数据库中提取一个任务,执行该任务并将结果放回数据库。(这个系统正在运行)

现在我想允许多个处理任务来完成工作,但要确保没有任务被重复完成(这是出于性能方面的考虑,而不是会造成其他问题)。由于访问是通过存储过程进行的,因此我的当前想法是将该存储过程替换为类似以下内容的东西:

update tbl 
set owner = connection_id() 
where available and owner is null limit 1;

select stuff 
from tbl 
where owner = connection_id();

顺便提一句,工作者的任务在获得工作和提交结果之间可能会丢失连接。此外,除非我搞砸了那部分(每分钟约5个作业),否则我不认为DB会成为瓶颈。

这样做有什么问题吗?有更好的方法吗?

注意:“将数据库用作IPC反模式” 在这里只是略微适用,因为

  1. 我没有进行IPC(没有进程生成行,它们现在都已经存在)
  2. 该反模式所描述的主要抱怨是它会导致DB上不必要的负载,因为进程等待消息(在我的情况下,如果没有消息,所有内容都可以关闭,因为所有内容都完成了)

正确 - 错误 = 在 dbms SELECT 读取期间使用阻塞的同步 IPC。你可能正在采用这种策略来引入异步操作。 - dkretz
顺便提一下,如果你想让读者在计时器上运行,那么不经常检查是很有用的,但如果他们找到了工作,他们可以在再次休眠之前排空队列。 - dkretz
注意我的编辑:如果他们找不到工作,他们将永远找不到工作。但如果这不是真的... - BCS
6个回答

38
在关系型数据库系统中实现作业队列的最佳方法是使用SKIP LOCKEDSKIP LOCKED是一种锁获取选项,适用于读/共享(FOR SHARE)或写/独占(FOR UPDATE)锁,并且现在得到了广泛支持:
  • Oracle 10g及更高版本
  • PostgreSQL 9.5及更高版本
  • SQL Server 2005及更高版本
  • MySQL 8.0及更高版本
现在,考虑我们有以下post表:

post table

status列被用作一个Enum,它的值为:

  • PENDING (0),
  • APPROVED (1),
  • SPAM (2)。

如果我们有多个并发用户尝试审核post记录,我们需要一种协调他们努力的方式,以避免两个审核员审查同一行post

所以,SKIP LOCKED正是我们需要的。如果两个并发用户Alice和Bob执行以下SELECT查询,这些查询在独占锁定post记录的同时添加了SKIP LOCKED选项:

[Alice]:
SELECT
    p.id AS id1_0_,1
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
 
[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

我们可以看到,Alice可以选择前两条记录,而Bob选择接下来的两条记录。如果没有使用SKIP LOCKED,Bob获取锁的请求将会被阻塞,直到Alice释放对前两条记录的锁定。

14

以下是我过去成功使用的内容:

MsgQueue表架构

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

您的消息类型符合预期 - 消息符合插入进程和读取进程之间的契约,使用XML或您选择的其他表示形式进行结构化(例如,在某些情况下,JSON是方便的)。

然后0到n个进程可以插入,0到n个进程可以读取和处理消息,每个读取进程通常处理单个消息类型。同一进程类型的多个实例可以运行以进行负载平衡。

读取器拉取一条消息并将状态更改为"A"ctive,同时对其进行处理。完成后,它将状态更改为"C"omplete。根据是否想要保留审计跟踪,它可以删除消息或不删除。状态为'N'的消息按MsgType/Timestamp顺序提取,因此在MsgType + State + CreateTime上有一个索引。

变种:
"E"rror状态。
Reader进程代码的列。
状态转换的时间戳。

这为执行诸如您描述的许多操作提供了一个不错的、可扩展的、可见的、简单的机制。如果您对数据库有基本的了解,它就非常稳定和可扩展。


来自评论的代码:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN

我感兴趣的是描述为“读取器提取一条消息并将状态更改为'A'ctive,同时对其进行处理”的部分。你如何实现这个部分?(除此之外,看起来我的与你的相同,只不过在我的情况下不需要那些不需要的东西。) - BCS
对,这需要在BEGIN TRAN和COMMIT TRAN之间使用多个SQL语句。紧接着是用于提取下一条消息的存储过程 - 经过了一些修改,我省略了错误捕获,因为它是在TRY/CATCH之前编写的。 - dkretz
1
创建存储过程:GetMessage @MsgType VARCHAR(8) ) ASDECLARE @MsgId INTBEGIN TRANSELECT TOP 1 @MsgId = MsgId FROM
MsgQueue WHERE MessageType = @pMessageType AND State = 'N' ORDER BY CreateTime
- dkretz
1
如果 @MsgId 不为空
则执行以下操作:
更新 MsgQueue 表中 MsgId = @MsgId 的记录的 State 字段为 'A'。 查询 MsgQueue 表中 MsgId = @MsgId 的记录,返回 MsgId 和 Msg 字段。 否则
返回 MsgId 和 Msg 字段均为空。 提交事务。
- dkretz
如果我需要同时选择多行记录怎么办? 是否可以同时更新所有选中的记录? - Amitd
假设您使用公共时间戳或选择批次ID进行标记,您可以在单个语句中更新它们。或者使用上面描述的“A”状态,并在状态为“A”的情况下进行更新。 - dkretz

0

当某物没有拥有者时,你应该将其设置为一个虚假的无人记录,而不是 owner = null。搜索 null 不会限制索引,你可能最终会得到一张表扫描(此适用于 Oracle,SQL Server 可能有所不同)。


0

作为可能的技术变化,您可以考虑使用MSMQ或类似的东西。

您的每个作业/线程都可以查询消息队列,以查看是否有新作业可用。因为读取消息的操作会将其从堆栈中移除,所以您可以确保只有一个作业/线程会收到该消息。

当然,这是在假设您正在使用Microsoft平台的情况下。


我已经在数据库中有了数据,完成后仍需要将数据保存在数据库中。在我的情况下,我认为没有必要向系统添加另一个组件。(顺便说一下,http://www.microsoft.com/windowsserver2003/technologies/msmq/default.mspx) - BCS

0

请参考Vlad的答案以了解上下文,我只是添加了Oracle中的等效内容,因为有一些需要注意的“陷阱”。

SELECT * FROM t order by x limit 2 FOR UPDATE OF t SKIP LOCKED

不会直接按您的预期将其翻译为Oracle。如果我们考虑几种翻译选项,我们可以尝试以下任何一种:

SQL> create table t as
  2   select rownum x
  3   from dual
  4   connect by level <= 100;

Table created.

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x for update skip locked fetch first 2 rows only;
  5  end;
  6  /
  open rc for select * from t order by x for update skip locked fetch first 2 rows only;
                                                                *
ERROR at line 4:
ORA-06550: line 4, column 65:
PL/SQL: ORA-00933: SQL command not properly ended
ORA-06550: line 4, column 15:
PL/SQL: SQL Statement ignored

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x fetch first 2 rows only for update skip locked ;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

或者尝试回退到使用 ROWNUM 选项

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from ( select * from t order by x ) where rownum <= 10 for update skip locked;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

你不会得到任何乐趣。因此,你需要自己控制获取“n”行的过程。因此,你可以编写如下代码:

SQL> declare
  2    rc sys_refcursor;
  3    res1 sys.odcinumberlist := sys.odcinumberlist();
  4  begin
  5    open rc for select * from t order by x for update skip locked;
  6    fetch rc bulk collect into res1 limit 10;
  7  end;
  8  /

PL/SQL procedure successfully completed.

-6

你正在尝试实现“数据库作为IPC”的反模式。查阅相关资料以了解为什么应该考虑重新设计你的软件。


1
在这种情况下,你怎么知道这是反模式,或者软件设计不当?你完全没有任何上下文可以基于这个评论。 - Greg Beech
我将其称为异步IPC的有用模式。您可以将其配置为像任何普通消息队列一样运作,根据我的经验它们不被标记为“反模式”。 - dkretz
1
这里提到了反模式的参考 - http://tripatlas.com/Database_as_an_IPC 不同之处在于,我们讨论的是将数据库用作消息队列,而不是进程相互操作的机制。 - dkretz
2
使用数据库作为消息队列是一种反模式。你会遇到大量的锁争用问题,如果你正在使用带有多个工作进程的MVCC系统,你最终会得到任何记录的模糊状态。你应该使用像RabbitMQ这样的消息队列代理。 - jasonjwwilliams
@jasonjwwilliams 不完全是这样,因为使用消息代理意味着你需要处理分布式事务 - 这就是为什么人们使用出箱模式,因此数据库。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接