如何在更新链的末尾触发触发器?

9
我有几个表格,它们使用触发器相互交互,我目前处理触发器执行的方式是使用 pg_trigger_depth() < 2,这种方法不够优美。
我真的希望最终触发器只运行一次,并且在所有逐行操作发生后才运行。不幸的是,CONSTRAINT TRIGGER 只支持 FOR EACH ROW,而 FOR STATEMENT 触发器实际上会在触发器中的每个语句中触发一次,而不仅仅是在启动它的初始语句中触发一次。
我查看了该主题周围的其他 SO 问题,没有找到类似于我所做的东西。
以下是设置:
CREATE TABLE report(
  report_tk SERIAL PRIMARY KEY,
  report_id UUID NOT NULL,
  report_name TEXT NOT NULL,
  report_data INT NOT NULL,
  report_subscribers TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
  valid_range TSTZRANGE NOT NULL DEFAULT '(,)',
  EXCLUDE USING GIST ((report_id :: TEXT) WITH =, report_name WITH =, valid_range WITH &&)
);
CREATE TABLE report_subscriber(
  report_id INT NOT NULL REFERENCES report ON DELETE CASCADE;
  subscriber_name TEXT NOT NULL,
  needs_sync BOOLEAN NOT NULL DEFAULT TRUE,
  EXCLUDE USING GIST (subscriber_name WITH =, valid_range WITH &&)
);
CREATE OR REPLACE FUNCTION sync_subscribers_to_report()
  RETURNS TRIGGER LANGUAGE plpgsql SET SEARCH_PATH TO dwh, public AS $$
BEGIN
  RAISE INFO 'Running sync to report trigger';

  BEGIN
    CREATE TEMPORARY TABLE lock_sync_subscribers_to_report(
    ) ON COMMIT DROP;
    RAISE INFO 'syncing to report, stack depth is: %', pg_trigger_depth();
    UPDATE report r
    SET report_subscribers = x.subscribers
    FROM (
           SELECT
             report_tk
             , array_agg(DISTINCT u.subscriber_name ORDER BY u.subscriber_name) AS subscribers
           FROM report_subscriber s
           WHERE s.report_tk IN (
             SELECT DISTINCT report_tk
             FROM report_subscriber s2
             WHERE s.needs_sync
           )
           GROUP BY s.report_tk
         ) x
    WHERE r.report_tk = x.report_tk;
    RAISE INFO 'turning off sync flag, stack depth is: %', pg_trigger_depth();
    UPDATE report_subscriber
    SET needs_sync = FALSE
    WHERE needs_sync = TRUE;
    RETURN NULL;
  EXCEPTION WHEN DUPLICATE_TABLE THEN
    RAISE INFO 'skipping recursive call, stack depth is: %', pg_trigger_depth();
    RETURN NULL;
  END;
END;
$$;
CREATE TRIGGER sync_subscribers_to_report
  AFTER INSERT OR UPDATE OR DELETE
  ON report_subscriber
  FOR STATEMENT
EXECUTE PROCEDURE sync_subscribers_to_report();

因此,我希望能够完成以下设置:
  • 插入报告记录
  • 确保报告名称在任何单一时间点上只能存在一次(使用valid_range上的EXCLUDE)
  • 在订阅者表中插入报告订阅者
  • 确保一个订阅者同时只能订阅一个报告。
  • 允许多个人订阅报告。
  • 每当向订阅者表中添加记录时,将该名称添加到报告表中的订阅者列表中。
  • 每当从订阅者表中删除记录时,从报告表中的订阅者列表中删除名称。
  • 每当从报告表中删除记录时,删除相应的订阅者记录(由ON DELETE CASCADE处理)

如果在单个语句中对订阅者表进行了大量编辑(常见情况),最好只运行一个简单的查询来使用订阅者表中新记录和剩余记录的聚合更新报告表。

我的原始解决方案涉及向订阅者表添加needs_update标志,并基于此触发更新然后关闭该标志。 当然,这会导致触发器的另一个触发,我使用pg_trigger_depth() < 2(2是因为插入可能是由系统中的其他触发器引起的)来停止该触发。除了丑陋之外,还很烦人的是触发器函数中的语句会导致更多的FOR EACH STATEMENT触发。

我尝试了使用标志的不同版本,其中包含我在其他SO答案中看到的技巧(https://dev59.com/dWox5IYBdhLWcg3w1Xy-#8950639),即创建一个临时表并捕获重复表异常以防止进一步执行。 不过,我认为这并没有真正改善问题。

有没有一种干净的方法来完成我正在尝试做的事情?虽然这是一个明显的玩具示例,但我的实际应用程序确实需要构建数据的“压缩数组”表示,并且最好以有效的方式进行。


今天早上我有一个想法,我很希望它能够奏效。我尝试在SQL_DROP上创建一个EVENT TRIGGER,希望在ON COMMIT DROP删除临时表后会触发该事件,但是触发器并不会对自动删除的操作进行响应,只有显式删除才会触发。 - deinspanjer
needs_sync标记是否还有其他用途,或者只是为了这个触发器而存在? - Nick Barnes
它只是用于触发。 - deinspanjer
1个回答

8

与其在report_subscriber中使用标志位,我认为您最好使用一个独立的待处理更改队列。这样有几个好处:

  • 避免触发器递归
  • 在底层,UPDATE只是DELETE + 重新INSERT,因此插入到队列中实际上比翻转标志位要便宜
  • 可能会更便宜一些,因为您只需要将不同的report_id加入队列,而不是复制整个report_subscriber记录,并且可以在临时表中完成,因此存储是连续的,无需同步到磁盘
  • 翻转标志位时无需担心竞争条件,因为该队列仅限于当前事务(在您的实现中,受UPDATE report_subscriber影响的记录未必是您在SELECT中选择的相同记录...)

因此,请初始化队列表:

CREATE FUNCTION create_queue_table() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
  CREATE TEMP TABLE pending_subscriber_changes(report_id INT UNIQUE) ON COMMIT DROP;
  RETURN NULL;
END
$$;

CREATE TRIGGER create_queue_table_if_not_exists
  BEFORE INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
  ON report_subscriber
  FOR EACH STATEMENT
  WHEN (to_regclass('pending_subscriber_changes') IS NULL)
  EXECUTE PROCEDURE create_queue_table();

当更改到达时,将其排队,忽略已排队的任何内容:

CREATE FUNCTION queue_subscriber_change() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
  IF TG_OP IN ('DELETE', 'UPDATE') THEN
    INSERT INTO pending_subscriber_changes (report_id) VALUES (old.report_id)
    ON CONFLICT DO NOTHING;
  END IF;

  IF TG_OP IN ('INSERT', 'UPDATE') THEN
    INSERT INTO pending_subscriber_changes (report_id) VALUES (new.report_id)
    ON CONFLICT DO NOTHING;
  END IF;
  RETURN NULL;
END
$$;

CREATE TRIGGER queue_subscriber_change
  AFTER INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
  ON report_subscriber
  FOR EACH ROW
  EXECUTE PROCEDURE queue_subscriber_change();

...并在语句结束时处理队列:

CREATE FUNCTION process_pending_changes() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
  UPDATE report
  SET report_subscribers = ARRAY(
    SELECT DISTINCT subscriber_name
    FROM report_subscriber s
    WHERE s.report_id = report.report_id
    ORDER BY subscriber_name
  )
  FROM pending_subscriber_changes c
  WHERE report.report_id = c.report_id;

  DROP TABLE pending_subscriber_changes;
  RETURN NULL;
END
$$;

CREATE TRIGGER process_pending_changes
  AFTER INSERT OR UPDATE OF report_id, subscriber_name OR DELETE
  ON report_subscriber
  FOR EACH STATEMENT
  EXECUTE PROCEDURE process_pending_changes();

这里有一个小问题:

UPDATE

不能保证更新顺序。这意味着,如果同时运行这两个语句:

INSERT INTO report_subscriber (report_id, subscriber_name) VALUES (1, 'a'), (2, 'b');
INSERT INTO report_subscriber (report_id, subscriber_name) VALUES (2, 'x'), (1, 'y');

如果他们试图以相反的顺序更新report记录,则有可能发生死锁。您可以通过强制执行所有更新的一致顺序来避免这种情况,但不幸的是,无法将ORDER BY附加到UPDATE语句中。我认为您需要使用游标:

CREATE FUNCTION process_pending_changes() RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
  target_report CURSOR FOR
    SELECT report_id
    FROM report
    WHERE report_id IN (TABLE pending_subscriber_changes)
    ORDER BY report_id
    FOR NO KEY UPDATE;
BEGIN
  FOR target_record IN target_report LOOP
    UPDATE report
    SET report_subscribers = ARRAY(
        SELECT DISTINCT subscriber_name
        FROM report_subscriber
        WHERE report_id = target_record.report_id
        ORDER BY subscriber_name
      )
    WHERE CURRENT OF target_report;
  END LOOP;

  DROP TABLE pending_subscriber_changes;
  RETURN NULL;
END
$$;

如果客户端尝试在同一事务中运行多个语句(因为更新顺序仅在每个语句内应用,但更新锁定将保留到提交),则仍有可能发生死锁。您可以通过在事务结束时仅触发process_pending_changes() 来解决这个问题(缺点是,在该事务中,您将看不到自己更改后的report_subscribers数组)。
以下是一个“on commit”触发器的通用概述,如果您认为值得填写,则可以使用:
CREATE FUNCTION run_on_commit() RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
  <your code goes here>
  RETURN NULL;
END
$$;

CREATE FUNCTION trigger_already_fired() RETURNS BOOLEAN LANGUAGE plpgsql VOLATILE AS $$
DECLARE
  already_fired BOOLEAN;
BEGIN
  already_fired := NULLIF(current_setting('my_vars.trigger_already_fired', TRUE), '');
  IF already_fired IS TRUE THEN
    RETURN TRUE;
  ELSE
    SET LOCAL my_vars.trigger_already_fired = TRUE;
    RETURN FALSE;
  END IF;
END
$$;

CREATE CONSTRAINT TRIGGER my_trigger
  AFTER INSERT OR UPDATE OR DELETE ON my_table
  DEFERRABLE INITIALLY DEFERRED
  FOR EACH ROW
  WHEN (NOT trigger_already_fired())
  EXECUTE PROCEDURE run_on_commit();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接