作业队列作为具有多个消费者的SQL表(PostgreSQL)

43

我有一个典型的生产者-消费者问题:

多个生产者应用程序将作业请求写入PostgreSQL数据库上的作业表。

作业请求具有一个状态字段,在创建时包含QUEUED。

存在多个消费者应用程序,当生产者插入新记录时,它们会收到通知:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";
他们将尝试通过将其状态设置为RESERVED来保留新记录。当然,只有一个消费者应该成功。所有其他消费者都不应能够预订相同的记录。他们应该预订状态为QUEUED的其他记录。
示例: 某些生产者向表jobrecord添加了以下记录:
id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

现在,有两个消费者AB想要处理它们。他们同时开始运行。一个应该保留id 1,另一个应该保留id 2,然后完成得最快的那个应该保留id 3,以此类推。

在一个纯多线程的世界中,我会使用互斥锁来控制对作业队列的访问,但是这些消费者是不同的进程,可能在不同的机器上运行。他们只能访问同一数据库,因此所有同步必须通过数据库进行。

我阅读了很多关于PostgreSQL并发访问和锁定的文档,例如http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html在Postgresql中选择未锁定的行PostgreSQL和锁定

从这些主题中,我学到了以下SQL语句应该能够满足我的需求:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

很不幸,当我在多个消费者进程中运行此代码时,大约有50%的时间它们仍然会保留相同的记录,从而处理并覆盖另一个进程所做的更改。

我错过了什么?我该如何编写SQL语句才能使多个消费者不会保留相同的记录?


你是在一个事务中执行这个操作,对吧?需要用 BEGIN 和 COMMIT 吗? - regilero
不,不需要使用BEGIN和COMMIT。我尝试在事务中执行,但是什么也没有发生,我的消费者没有处理任何内容,表格保持不变。我应该使用事务吗? - code_talker
看起来,交易问题出在我在Qt中编写的消费者应用程序中使用它的方式上。 - code_talker
好的,我看到你下面的回答了,也许你可以编辑一下,加上最终可行的代码,因为这是一个相当常见的问题。 - regilero
如果你在事务中执行了这个操作,但是什么都没有发生,我的猜测是你没有提交,因为在这种情况下事务会回滚。 - apinstein
7个回答

40
我也使用Postgres作为FIFO队列。最初我使用了ACCESS EXCLUSIVE锁,它在高并发情况下可以产生正确的结果,但不幸的是,与pg_dump互斥,后者在执行期间获取ACCESS SHARE锁。这导致我的next()函数锁定了很长时间(pg_dump的持续时间)。由于我们是一家24x7的商店,客户不喜欢在半夜看到队列中出现死时间,所以这是不可接受的。
我想必须有一种更少限制的锁,既能保证并发安全,又不会在pg_dump运行时锁定。我的搜索引导我找到了这篇SO文章。
然后我进行了一些研究。
以下模式足以满足FIFO队列NEXT()函数的要求,该函数将作业的状态从“queued”更新为“running”,无需担心并发失败,并且还不会阻塞pg_dump:
SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

查询:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

结果看起来像:

UPDATE 1
 job_id
--------
     98
(1 row)

这里有一个 shell 脚本,可以在高并发(30)下测试所有不同的锁定模式。

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

如果您想编辑,代码也在这里: https://gist.github.com/1083936

我正在更新我的应用程序,以使用EXCLUSIVE模式,因为它是最严格的模式,既正确又不会与pg_dump冲突。我选择了最严格的模式,因为相比于从ACCESS EXCLUSIVE更改应用程序而言,这似乎是风险最小的,尤其是对于那些不是Postgres锁定方面专家的人。

我对我的测试环境和答案背后的一般思路感到非常满意。我希望分享这些可以帮助其他人解决这个问题。


我对这个解决方案进行了大量测试,并且百分之百确信这是正确的方法。我已经将其部署到我的生产系统中(使用EXCLUSIVE模式进行出队)。祝使用愉快! - apinstein
2
不应该需要表锁,只需要行锁:https://dev59.com/KGw15IYBdhLWcg3wmc8J#30315387 - mackross

24

不需要对整个表进行锁定:使用for update创建的行锁定可以正常工作。

请参见https://gist.github.com/mackross/a49b72ad8d24f7cefc32,了解我对apinstein答案所做的更改,并验证其仍然有效。

最终代码是:

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;

这个解决方案明确地移除了事务。为什么? - Yitz
pg在语句周围隐式添加了一个事务。 - mackross
3
@mackross 最好使用for update skip locked而不是仅使用for update,因为它不会阻塞其他并发读取者。也许你没有使用它的原因是在2015年时skip locked语句还不可用。 - Frederick The Fool

8

6

1
谢谢您,添加表锁定 LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;(并在Qt中修复我的事务语句)是解决方案。现在,它正在按预期工作。谢谢! - code_talker
1
ACCESS EXCLUSIVE可以使用,但它会带来严重的性能副作用。特别是,在pg_dump期间,它将被阻塞,该过程使用ACCESS SHARE模式。这意味着您的队列将在备份期间被阻塞,对于我们而言可能需要几个小时。 - apinstein

2

-1

好的,这是对我有效的解决方案,基于jordani提供的链接。由于我的一些问题涉及Qt-SQL的工作方式,因此我包含了Qt代码:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

为了检查是否有多个消费者处理同一个作业,我添加了一条规则和一个日志表:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

没有 LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; 语句,日志表偶尔会填满条目,其中一个消费者已经覆盖了另一个消费者的值,但使用 LOCK 语句,日志表保持为空 :-)


-2

不要重复造轮子,可以看看PgQ


2
我已经检查过PgQ了,但不幸的是,它会将相同的事件发送给所有消费者:您可以在同一事件队列上拥有任意数量的消费者,但他们都将看到相同的事件而不是共享工作负载。这正是我不想要发生的事情。我本来更愿意使用现有的解决方案,但找不到一个能满足我的要求。 - code_talker
啊,我会使用PgQ,然后让每个消费者使用“UPDATE ... RETURNING”语句在不同的表中注册工作锁。这是一个值得思考的想法。 - Sean

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接