在Postgresql中选择未锁定的行

50

有没有一种方法可以在PostgreSQL中选择未锁定的行? 我有一个多线程应用程序将执行以下操作:

Select... order by id desc limit 1 for update

在一个表上执行此查询时,如果有多个线程运行,则它们都尝试获取同一行。

其中一个获取了行锁,另一个被阻塞,然后在第一个更新该行后失败。我真正想要的是第二个线程获取第一个匹配WHERE子句且尚未锁定的行。

澄清一下,我希望每个线程在执行选择操作后立即更新第一行可用行。

因此,如果有ID为1、2、3、4的行,则第一个线程将进入,选择具有ID=4的行并立即更新它。

如果在该事务期间出现第二个线程,则我希望它获取具有ID=3的行并立即更新该行。

由于WHERE子句会匹配已锁定的行(例如我的示例中的ID = 4),因此使用Share或nowait将无法实现这一点。基本上,我想要的是在WHERE子句中添加"AND NOT LOCKED"之类的内容。

Users

-----------------------------------------
ID        | Name       |      flags
-----------------------------------------
1         |  bob       |        0
2         |  fred      |        1
3         |  tom       |        0
4         |  ed        |        0
如果查询是 "Select ID from users where flags = 0 order by ID desc limit 1",当返回一行时,下一步是 "Update Users set flags = 1 where ID = 0",我希望第一个线程获取ID 4的行,下一个线程获取ID 3的行。
如果在选择语句中添加 "For Update",那么第一个线程将获得该行,第二个线程将被阻塞,然后返回空值,因为一旦第一个事务提交,WHERE子句就不再满足条件。
如果我不使用 "For Update",那么我需要在后续的更新操作中添加 WHERE 子句(WHERE flags = 0),以便只有一个线程可以更新这行。
第二个线程将选择与第一个线程相同的行,但第二个线程的更新将失败。
无论哪种方式,第二个线程都无法获得一行并更新,因为我无法让数据库给第一个线程行4,给第二个线程行3,因为这些事务是重叠的。
14个回答

36

2
并且在9.5+版本中可用。https://www.postgresql.org/docs/9.5/static/sql-select.html - grahamrhay
实际上我是误点了踩的。这个看起来不错,但缺点是它在 PostgreSQL 9.5 之前的早期版本中不可用。 - Krzysztof Tomaszewski

10

不,不,不,嘿嘿嘿 :-)

我知道作者的意思。我有一个类似的情况,并想出了一个好的解决方案。首先,我将从描述我的情况开始。我有一个表,其中存储了在特定时间发送的消息。PG 不支持函数的定时执行,因此我们必须使用守护程序(或 cron)。我使用一个自定义编写的脚本打开了几个并行进程。每个进程选择一组需要精确到 +1 秒 / -1 秒 的要发送的消息。表本身会动态更新新消息。

因此,每个进程都需要下载一组行。由于这组行不能被其他进程下载,否则会造成很多混乱(有些人会收到多个消息,而实际上只应该收到一条),所以我们需要锁定这些行。用于下载带锁定的一组消息的查询:

FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE FOR UPDATE LOOP
-- DO SMTH
END LOOP;
每0.5秒启动一次这个查询的进程。这将导致下一个查询等待第一个锁定行的锁被解除。这种方法会造成巨大延迟。即使我们使用NOWAIT,查询也会导致异常,而我们不希望出现异常,因为表中可能有新的消息需要发送。如果仅使用FOR SHARE,查询将能够正确执行,但仍会花费很长时间,造成巨大延迟。
为了使其正常工作,我们进行了一些“魔法”:
1. 更改查询:
FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE AND is_locked(msg_id) IS FALSE FOR SHARE LOOP
-- DO SMTH
END LOOP;
  • 神秘函数'is_locked(msg_id)'看起来像这样:

    CREATE OR REPLACE FUNCTION is_locked(integer) RETURNS BOOLEAN AS $$
    DECLARE
        id integer;
        checkout_id integer;
        is_it boolean;
    BEGIN
        checkout_id := $1;
        is_it := FALSE;
    
        BEGIN
            -- we use FOR UPDATE to attempt a lock and NOWAIT to get the error immediately 
            id := msg_id FROM public.messages WHERE msg_id = checkout_id FOR UPDATE NOWAIT;
            EXCEPTION
                WHEN lock_not_available THEN
                    is_it := TRUE;
        END;
    
        RETURN is_it;
    
    END;
    $$ LANGUAGE 'plpgsql' VOLATILE COST 100;
    
    当然,我们可以定制这个函数来处理您数据库中的任何表格。在我看来,最好是为每一个表单创建一个检查函数。将更多东西添加到此函数中可能只会使它变得更慢。反正检查这个子句需要更长的时间,所以没有必要让它变得更慢。对我而言,这是完整的解决方案,并且它运行得非常完美。
    现在,当我有50个进程并行运行时,每个进程都有一组唯一的新消息要发送。一旦它们被发送,我只需将行更新为sent = TRUE并再也不回头了。
    我希望这个解决方案对您(作者)也适用。如果您有任何问题,请告诉我 :-)
    哦,如果这也对你起作用,请让我知道。

  • 7

    我使用类似这样的东西:

    select  *
    into l_sms
    from sms
    where prefix_id = l_prefix_id
        and invoice_id is null
        and pg_try_advisory_lock(sms_id)
    order by suffix
    limit 1;
    

    不要忘记调用pg_advisory_unlock


    另外,我发现了一个好的贡献模块:http://www.postgresql.org/docs/9.2/static/pgrowlocks.html SELECT * FROM accounts AS a, pgrowlocks('accounts') AS p WHERE p.locked_row = a.ctid; - Timon
    3
    这个问题旧了,但这应该是应该被接受的答案(或者像Peter建议的那样使用PGQ)。还要考虑使用事务级别的咨询锁和 pg_try_advisory_xact_lock(sms_id)相关答案 - Erwin Brandstetter
    在使用LIMIT的同时使用pg_try_advisory_lock是否危险?正如这里所指出的,这可能会导致PG在应用LIMIT之前获取锁,从而导致PG::OutOfMemory错误。 - mithunm93
    @mithunm93,是的,PostgreSQL的查询优化器非常棘手,有时它会重新排序“where”条件。 在像“queues”这样的表中,由于频繁的插入/更新/删除(约300次/秒),使用pg_try_advisory_lock无法获得可靠的功能。不时地,查询优化器首先运行pg_try_advisory_lock条件,这是“共享内存不足错误”的原因。即使应用了“偏移量0”的解决方法,也没有帮助。 - Oleksandr_DJ

    5

    2

    看起来你正在尝试获取队列中未被其他进程处理的最高优先级项。

    一个可能的解决方案是添加一个where子句,将其限制为未处理的请求:

    select * from queue where flag=0 order by id desc for update;
    update queue set flag=1 where id=:id;
    --if you really want the lock:
    select * from queue where id=:id for update;
    ...
    

    希望第二次交易在标记更新期间被阻止,等待标记更新后才能继续进行,但标记将限制其到下一个队列中。使用可串行化隔离级别,也有可能获得所需结果,而无需进行这些疯狂的操作。根据应用程序的性质,可能有比在数据库中实现更好的方法,比如先进先出或后进先出管道。此外,可能会有可能反转您需要的顺序,并使用序列来确保它们按顺序处理。

    1
    我的解决方案是使用带有RETURNING子句的UPDATE语句。
    Users
    
    -----------------------------------
    ID        | Name       |      flags
    -----------------------------------
    1         |  bob       |        0  
    2         |  fred      |        1  
    3         |  tom       |        0   
    4         |  ed        |        0   
    

    不要使用 SELECT .. FOR UPDATE,而是使用

    BEGIN; 
    
    UPDATE "Users"
    SET ...
    WHERE ...;
    RETURNING ( column list );
    
    COMMIT;
    

    由于 UPDATE 语句在更新表时获取了一个 ROW EXCLUSIVE 锁,因此您会获得序列化的更新。读取仍然是允许的,但它们只能看到 UPDATE 事务开始前的数据。

    参考资料:Pg 文档中的并发控制章节。


    1

    这可以通过使用SELECT ... NOWAIT来实现;一个例子在这里


    那个例子做了比“NOWAIT”更多的事情...实际上我非常确定仅有“NOWAIT”是无法帮助的;它可以避免等待,但如果查询具有通用的“WHERE”子句,则无法查看锁定行之后的内容。 - Roman Starkov

    0

    用于多线程和集群吗?
    这个怎么样?

    START TRANSACTION;
    
    // All thread retrive same task list
    // If result count is very big, using cursor 
    //    or callback interface provied by ORM frameworks.
    var ids = SELECT id FROM tableName WHERE k1=v1;
    
    // Each thread get an unlocked recored to process.
    for ( id in ids ) {
       var rec = SELECT ... FROM tableName WHERE id =#id# FOR UPDATE NOWAIT;
       if ( rec != null ) {
        ... // do something
       }
    }
    
    COMMIT;
    

    0

    ^^ 这个方法可行。考虑将“锁定”状态设为“立即”状态。

    假设您的表格如下:

    id | 名字 | 姓氏 | 状态

    例如,可能的状态有:1=待处理,2=已锁定,3=已处理,4=失败,5=被拒绝

    每个新记录都会插入待处理状态(1)

    您的程序执行以下操作:“update mytable set status = 2 where id = (select id from mytable where name like '%John%' and status = 1 limit 1) returning id, name, surname”

    然后您的程序进行处理,如果得出结论该线程根本不应该处理该行,则执行以下操作:“update mytable set status = 1 where id = ?”

    否则,它会更新到其他状态。


    0

    看起来你正在寻找SELECT FOR SHARE。

    http://www.postgresql.org/docs/8.3/interactive/sql-select.html#SQL-FOR-UPDATE-SHARE

    FOR SHARE的行为类似,不同之处在于它获取每个检索行的共享锁而不是独占锁。共享锁会阻止其他事务对这些行执行UPDATE、DELETE或SELECT FOR UPDATE操作,但不会阻止它们执行SELECT FOR SHARE。

    如果在FOR UPDATE或FOR SHARE中指定了特定的表,则只有来自这些表的行被锁定;在SELECT中使用的任何其他表都将像通常一样被读取。没有表列表的FOR UPDATE或FOR SHARE子句会影响命令中使用的所有表。如果将FOR UPDATE或FOR SHARE应用于视图或子查询,则会影响视图或子查询中使用的所有表。

    如果需要为不同的表指定不同的锁定行为,则可以编写多个FOR UPDATE和FOR SHARE子句。如果相同的表在FOR UPDATE和FOR SHARE子句中被提及(或隐含地受到影响),则它将被处理为FOR UPDATE。同样,如果在影响它的任何子句中指定了NOWAIT,则该表将被处理为NOWAIT。

    在无法将返回的行清楚地标识为单个表行的情况下,不能在上下文中使用FOR UPDATE和FOR SHARE;例如,它们不能与聚合一起使用。


    1
    我不认为这能解决我的问题,可能是因为我上面没有清楚地说明。每个线程都希望获得它能够更新的第一行,然后更新该行。 [续上] - alanc10n
    1
    因此,如果表中有ID为1、2、3和4的行与WHERE子句条件匹配,第一个线程将更新具有ID 4的行,下一个线程将更新具有ID 3的行,以此类推。 - alanc10n
    抱歉,我不知道。 - Steven Behnke

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接