基于数据库队列的MySQL存储过程弹出操作

3

我们有一个系统,它使用基于数据库的队列来处理项目,而不是实时处理。目前是在Mybatis中实现的,调用MySQL中的此存储过程:

DROP PROCEDURE IF EXISTS pop_invoice_queue;
DELIMITER ;;
CREATE PROCEDURE pop_invoice_queue(IN compId int(11), IN limitRet int(11)) BEGIN

   SELECT LAST_INSERT_ID(id) as value, InvoiceQueue.* FROM InvoiceQueue 
      WHERE companyid = compId 
      AND (lastPopDate is null OR lastPopDate < DATE_SUB(NOW(), INTERVAL 3 MINUTE)) LIMIT limitRet FOR UPDATE;
   UPDATE InvoiceQueue SET lastPopDate=NOW() WHERE id=LAST_INSERT_ID(); 

END;;

DELIMITER ;

问题在于该存储过程从队列中弹出了N个项目,但只更新了最后一个弹出的项目的lastPopDate值。因此,如果我们使用limitRet = 5调用此存储过程,它将从队列中弹出五个项目并开始处理它们,但只有第五个项目会设置lastPopDate,因此当下一个线程来弹出队列时,它将获取项目1-4和6。
如何才能更新所有从数据库“弹出”的N条记录?
2个回答

2

如果您想通过以下方式向表中添加一个BIGINT字段:

ALTER TABLE InvoiceQueue
ADD uuid BIGINT NULL DEFAULT NULL,
INDEX ix_uuid (uuid);

您可以先执行更新操作,然后通过以下方式选择已更新的记录:

CREATE PROCEDURE pop_invoice_queue(IN compId int(11), IN limitRet int(11))
BEGIN
   SET @uuid = UUID_SHORT();

   UPDATE InvoiceQueue
   SET    uuid = @uuid,
          lastPopDate = NOW()
   WHERE  companyid = compId
   AND    uuid IS NULL 
   AND    (lastPopDate IS NULL OR lastPopDate < NOW() - INTERVAL 3 MINUTE)
   ORDER BY
          id
   LIMIT  limitRet;

   SELECT * 
   FROM   InvoiceQueue 
   WHERE  uuid = @uuid
   FOR    UPDATE;
END;;
UUID_SHORT()函数为返回唯一值,每台机器每秒最多只能调用1600万次,以保证其独特性。请参考此处了解更多细节。
为了提高查询性能,您可能需要将lastPopDate字段更改为NOT NULL,因为OR子句会导致查询未使用索引,即使有一个可用的索引:
ALTER TABLE InvoiceQueue
MODIFY lastPopDate DATETIME NOT NULL DEFAULT '0000-00-00';

然后,如果您还没有这样的索引,则可以按照以下方式在companyid/lastPopDate/uuid字段上添加一个索引:

ALTER TABLE InvoiceQueue
ADD INDEX ix_company_lastpop (companyid, lastPopDate, uuid);

然后您可以从您的UPDATE查询中删除OR子句:
   UPDATE InvoiceQueue
   SET    uuid = @uuid,
          lastPopDate = NOW()
   WHERE  companyid = compId 
   AND    lastPopDate < NOW() - INTERVAL 3 MINUTE
   ORDER BY
          id
   LIMIT  limitRet;

这将使用您刚创建的索引。


这是一个非常完整的答案,谢谢您抽出时间来回答。我将会实现与此非常接近的方案并进行测试。有一件事让我有点担心,即更新和选择之间似乎存在一个小的竞态条件,因为记录没有被锁定为“FOR UPDATE”。这几乎肯定比我们现在面临的问题要小得多。 - kasdega
大部分情况下,我们使用的是这个。我们在使用UUID_SHORT()时遇到了一些小问题(不支持在MySQL中),所以我们使用了UUID()和varchar,并且我们在guid列上明确设置了排序规则来解决比较和排序的问题。除此之外,它已经实现并且正常工作。谢谢。 - kasdega

0

由于MySQL既没有集合也没有输出/返回子句,我的建议是使用临时表。类似这样:

CREATE TEMPORARY TABLE temp_data 
SELECT LAST_INSERT_ID(id) as value, InvoiceQueue.* FROM InvoiceQueue 
  WHERE companyid = compId 
  AND (lastPopDate is null OR lastPopDate < DATE_SUB(NOW(), INTERVAL 3 MINUTE)) LIMIT limitRet FOR UPDATE;  
UPDATE InvoiceQueue 
INNER JOIN temp_data ON (InvoiceQueue.PKColumn = temp_data.PKColumn)
SET lastPopDate=NOW();
SELECT * FROM temp_data ;
DROP TEMPORARY TABLE temp_data;

此外,我猜测这样的select ... for update可能会导致死锁(特别是在不同的会话中调用该过程时)- 据我所知,锁定行的顺序不能保证(即使您使用了order by,行也可能以不同的顺序被锁定)。我建议再次检查文档。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接