如何在Node.js中正确监听PostgreSQL通知

6

目标:
当向特定的PostgreSQL表中插入新记录时,我希望PostgreSQL通知我的node.js Web应用程序,以便它可以启动对外部服务的API调用。

我了解基本步骤如下:

  1. 建立一个执行pg_notify()方法的PostgreSQL触发器函数。
  2. 建立一个在表插入后执行触发器函数的PostgreSQL触发器。
  3. 建立一个在node.js中监听特定频道PostgreSQL通知的机制。

以下是各个步骤的具体说明:

  1. Trigger function in notify_app_after_table_insert.pgsql

    CREATE OR REPLACE FUNCTION notify_app_after_table_insert()
    RETURNS TRIGGER AS
    $BODY$
        BEGIN
            PERFORM pg_notify('channel', row_to_json(NEW)::text);
            RETURN new;
        END;
    $BODY$
    LANGUAGE plpgsql
    
  2. Trigger in trigger_notify_app_after_table_insert.sql

    CREATE TRIGGER trigger_notify_app_after_table_insert
    AFTER INSERT
    ON table
    FOR EACH ROW
    EXECUTE PROCEDURE notify_app_after_table_insert();
    
  3. Listener mechanism in index.js (inside my web app's backend)

    //tools
    const express = require('express');
    const app = express();
    const cors = require('cors');
    const bodyParser = require('body-parser');
    const port = 3001;
    const pool = require('./db'); //stores my postgresql credentials
    
    // Middleware
    app.use(cors())
    app.use(bodyParser.json())
    app.use(bodyParser.urlencoded({extended: true}))
    
    // Apply app.listen notification to console.log
    app.listen(port, () => {
        console.log(`App running on port ${port}.`)
    })
    
    // Apply channel-specific listener mechanism
    pool.connect(function(err, client, done) {
        if(err) {
            console.log(err);
        }
        client.on('notification', function(msg) {
            console.log(msg);
        })
        client.query("LISTEN channel");
        done();
    });
    
问题:
当后端web应用程序服务器正在运行并插入db表中的新记录时,我希望在我的web应用程序终端中看到通知消息,但是没有任何东西出现。我怀疑问题出在index.js的最后一个代码块中,但一直没有能够隔离它。
有关如何在index.js中正确接收通知的任何建议?提前致谢。

迈克尔,有什么运气吗? - RJA
2个回答

2
我遇到了同样的问题,我决定使用pg-listen(https://github.com/andywer/pg-listen)。这是我的实现方式: PG:
CREATE TABLE active.events(
  uid UUID DEFAULT gen_random_uuid(),
  created_ts TIMESTAMP DEFAULT NOW(),
  consumed_ts TIMESTAMP NULL,
  origin VARCHAR(200) NOT NULL,
  channel VARCHAR(200) NOT NULL,
  type VARCHAR(50) NOT NULL,
  path VARCHAR(200) NOT NULL,
  payload JSONB NOT NULL,
  result JSONB,
  CONSTRAINT events_pkey PRIMARY KEY(uid),
  CONSTRAINT events_ukey UNIQUE(uid)
);
CREATE INDEX ON active.events(uid);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE active.events TO _pg_mb_rl;
ALTER TABLE active.events OWNER TO _pg_mb_rl;

-- TRIGGER
CREATE OR REPLACE FUNCTION active.tg_notify_events()
 RETURNS TRIGGER
 LANGUAGE PLPGSQL
AS $tg_notify_events$
DECLARE
    --channel TEXT := TG_ARGV[0];
BEGIN
    PERFORM pg_notify(NEW.channel, row_to_json(NEW)::TEXT);
    UPDATE active.events SET consumed_ts = NOW() WHERE uid = NEW.uid;
  RETURN NULL;
END;
$tg_notify_events$;

CREATE OR REPLACE TRIGGER notify_events
    AFTER INSERT ON active.events
    FOR EACH ROW EXECUTE PROCEDURE active.tg_notify_events();

NODEJS:
const createSubscriber = require('pg-listen');

const channel = 'message_queue';
const subscriber = createSubscriber({ connectionString: process.env.DATABASE_URL });
subscriber.notifications.on(channel, (payload) => {
  console.log('Received notification in ' + channel, payload);
});

subscriber.events.on('error', (error) => {
  console.error('Fatal database connection error:', error)
  process.exit(1)
});

process.on('exit', () => {
  subscriber.close()
});

await subscriber.connect();
await subscriber.listenTo(channel);

希望能有所帮助!

0
我认为这是因为顺序问题。 像这样写:
client.query("LISTEN channel");
client.on('notification', function(msg) {
  console.log(msg);
})

对我来说,首先查询LISTEN是有效的。

虽然对我来说不是这样,也许还有更多的原因。 - undefined
在没有测试的情况下,首先订阅通知使用LISTEN更有意义,然后在通知发生时注册操作。 - undefined
经过测试,我不得不说我和楼主有同样的问题,而这个解决方案并不能解决它。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接