通过逻辑复制插槽,您可以查询预写式日志流。
首先,您需要更改一些参数,然后重新启动服务器以使更改生效:
postgres=# alter system set wal_level = logical;
postgres=# alter system set max_replication_slots = 1;
在重新启动后,您需要创建一个插槽:
postgres=# SELECT * FROM pg_create_logical_replication_slot('slot', 'test_decoding');
slot_name | xlog_position
slot | 2E/839F3300
(1 row)
在这里,test_decoding
是一个输出插件名称,旨在将日志记录(二进制格式)转换为某些文本表示。
接下来我们创建一个表格...
postgres=# create table product(id serial, val json);
CREATE TABLE
现在您可以查询WAL流:
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
location | xid | data
2E/83A0BA48 | 80243 | BEGIN 80243
2E/83A1D2B8 | 80243 | COMMIT 80243
(2 rows)
很遗憾,现在您无法解码DDL,所以您只能得到BEGIN和END。Xid
字段代表事务号。
但是让我们插入一些东西...
postgres=# insert into product(val) values ('{"desc":"aaa"}');
INSERT 0 1
现在再次查询流:
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
location | xid | data
2E/83A1D3C0 | 80244 | BEGIN 80244
2E/83A1D3C0 | 80244 | table public.product: INSERT: id[integer]:1 val[json]:'{"desc":"aaa"}'
2E/83A1D440 | 80244 | COMMIT 80244
(3 rows)
您可以在此处查看表名和插入值。
更新语句同理:
postgres=# update product set val = '{"desc":"bbb"}';
UPDATE 1
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot', NULL, NULL);
location | xid | data
2E/83A1D560 | 80245 | BEGIN 80245
2E/83A1D560 | 80245 | table public.product: UPDATE: id[integer]:1 val[json]:'{"desc":"bbb"}'
2E/83A1D5E8 | 80245 | COMMIT 80245
(3 rows)
请注意,在使用
pg_logical_slot_get_changes
函数从流中“消耗”一些更改后,您将无法再次查询相同的更改。
如果不再需要,请删除该插槽:
postgres=# SELECT pg_drop_replication_slot('slot');
pg_drop_replication_slot
(1 row)
您可以在文档中阅读有关逻辑解码的更多信息。