你笑了

你的笑,是星星跳跃浪花的笑

0%

pglogical

特点

  • 实时复制、延迟复制
  • 表级复制
    指定特定的表集合,而不是整个数据库
  • 冲突决策
  • 支持 DDL 复制
  • 跨版本支持

添加插件

1
CREATE EXTENSION pglogical;

查看支持的函数和表

1
2
3
4
5
6
7
8
9
10
mydb=# \dx+ pglogical
对象用于扩展 "pglogical"
对象描述
----------------------------------------------------------------------------------------------
表 pglogical.depend
表 pglogical.local_node
...
函数 pglogical.xact_commit_timestamp_origin(xid)
视图 pglogical.tables
(51 行记录)

节点

添加

1
select pglogical.create_node (node_name := 'provider1', dsn := 'host=192.168.22.37 port=5420 dbname=postgres');

删除

1
2
// privider 是 if_name 列的值
select pglogical.drop_node('provider');

重新添加节点后需要重新发布

查看

1
select * from pglogical.node_interface;

复制集

pglogical 中是 set 复制集。pg 中是 slot 复制槽

查看

1
select * from pglogical.replication_set;

添加

1
select pglogical.create_replication_set('my_set', true, true, true, true);

删除

1
select pglogical.drop_replication_set(set_name text);

发布

添加

  • 没有主键的表无法添加到 default 复制集

    1
    2
    3
    4
    lfo-test=> SELECT pglogical.replication_set_add_table('default','pgtest');
    ERROR: table pgtest cannot be added to replication set default
    描述: table does not have PRIMARY KEY and given replication set is configured to replicate UPDATEs and/or DELETEs
    提示: Add a PRIMARY KEY to the table

表到复制集

1
2
3
4
5
6
7
-- 添加所有表
select pglogical.replication_set_add_all_tables('复制集', ARRAY['public']);
-- 添加指定表
select pglogical.replication_set_add_table(set_name:='my_set',relation:='monster',synchronize_data:=true);
select pglogical.replication_set_add_table('default','"MESSAGE"',true); -- 大写需要双引号

-- SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);

序列到复制集

1
2
3
4
-- 所有序列
SELECT pglogical.replication_set_add_all_sequences('default', ARRAY['public']);
-- 指定序列
SELECT pglogical.replication_set_add_sequence('default', '"MESSAGE_ID_seq"',true);

删除

复制集中的表

1
select * from pglogical.replication_set_remove_table('default','test');

复制集中的序列

1
select * from pglogical.replication_set_remove_sequence('default','test_id_seq');

查看

复制集中的表

1
2
3
4
5
6
select * from pglogical.replication_set_table;

set_id | set_reloid | set_att_list | set_row_filter
495597167 | "SERVICE_GROUP" | |
44270030 | "MESSAGE" | |
-- 通过 set_id 区分表所在的复制集

复制集中的序列

1
select * from pglogical.replication_set_seq;

订阅

  • 重新添加的订阅,synchronize_data 为 false 时,不会同步重新订阅成功之前添加的记录

    1
    2
    3
    4
    19 | pgtest | 2022-04-22 18:16:16.629578
    20 | pgtest | 2022-04-22 18:16:16.629578
    24 | pgtest | 2022-04-22 18:29:18.567585
    25 | pgtest | 2022-04-22 18:29:33.451064

    可以看到删除原订阅,在发布端添加记录 21,22,23。重新添加订阅,在发布端添加记录 24,25。发现延迟2分钟后,只复制了24,25。

添加

1
2
3
4
5
6
7
8
select pglogical.create_subscription(
subscription_name := 'subscription1',
provider_dsn := 'host=192.168.xx.xx port=5420 dbname=postgres',
replication_sets := '{my_set}',
apply_delay := '12:00:00',
synchronize_data := false
);
-- 注意最后一个参数后面没有 逗号
  • options

    • synchronize_structure

      不建议开启,默认同步所有表,且表的角色需要在订阅端创建,否则同步失败

      https://stackoverflow.com/a/65153144

      1
      2
      3
      4
      5
      6
      7
      2022-01-04 18:08:35.350 CST [22310] 调试:  forked new backend, pid=26072 socket=11
      2022-01-04 18:08:35.353 CST [26072] [未知]@[未知] 调试: 来自 "(anonymous)" 的 SSL 联接
      2022-01-04 18:08:35.355 CST [26072] postgres@mydb 注意: 扩展 "pglogical" 已经存在,跳过
      2022-01-04 18:08:35.355 CST [22313] 调试: snapshot of 1+0 running transaction ids (lsn 0/1767B20 oldest xid 558 latest complete 557 next xid 559)
      2022-01-04 18:08:35.361 CST [26072] postgres@mydb 调试: 为表 "pg_toast_16589" 串行的建立索引"pg_toast_16589_index"
      2022-01-04 18:08:35.364 CST [26072] postgres@mydb 错误: 角色 "lfo_backup" 不存在 # 角色不存在
      2022-01-04 18:08:35.364 CST [26072] postgres@mydb 语句: ALTER TABLE public.pgtest OWNER TO lfo_backup;
    • synchronize_data

      是否在开始复制之前,同步已存在的历史数据,默认为 true

      如果初次添加订阅,则默认即可

      如果半途中删除了订阅重新订阅,则为 false

      猜测如果为 false,只同步增量数据,不同步历史数据

      2022.04.22 测试,删除原订阅,发布端数据库添加记录,重新订阅(synchronize_date true)可以订阅成功,但是不同步刚才添加的记录

      对于数据量比较大的数据库

      1. 添加订阅时设置为 false。后续通过 pglogical.alter_subscription_synchronize or pglogical.alter_subscription_resynchronize_table 来同步
      2. 添加订阅时设置为 true。通过 pglogical.replication_set_add_table 依次将表添加到复制集中
    • apply_delay

      配置延迟反而会特别影响主库,因为会导致主库wal堆积。 逻辑复制本身对主库负载影响很小。强烈建议取消复制延迟12h

查看订阅状态

1
2
3
4
select subscription_name, status FROM pglogical.show_subscription_status();
subscription_name | status
-------------------+-------------
subscription1 | replicating

查看订阅详细信息

1
select * from pglogical.subscription;

同步复制集中未同步的表

同步复制集中未同步的表

1
2
pglogical.alter_subscription_synchronize(subscription_name name, truncate bool)
-- select * from pglogical.alter_subscription_synchronize('test_delay');

等待订阅同步完成

执行 pglogical.create_subscription or pglogical.alter_subscription_synchronize 之后调用该方法

1
2
pglogical.wait_for_subscription_sync_complete(subscription_name name)
-- select * from pglogical.wait_for_subscription_sync_complete('backup');

重新同步某张表

  • 会先 truncate 该表,再重新同步数据,可以同步重新订阅之前丢失的数据
1
2
pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)
-- select * from pglogical.alter_subscription_resynchronize_table('test_delay','pgtest');

等待某张表同步完成

1
pglogical.wait_for_table_sync_complete(subscription_name name, relation regclass)

等待发布端确认同步请求

1
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL)
  • 延迟订阅中,发布端在需要同步的表中插入一条记录,然后调用该方法,指定延迟后才会返回结果。

    此时在订阅端调用 wait_for_subscription_sync_complete 方法,等待订阅同步完成

删除

1
select pglogical.drop_subscription('backup_delay');

启停订阅

1
2
3
4
-- pglogical.alter_subscription_disable(subscription_name name, immediate bool)
select pglogical.alter_subscription_disable('lfotest_delay', true);
-- pglogical.alter_subscription_enable(subscription_name name, immediate bool)
select pglogical.alter_subscription_enable('lfotest_delay', true);

复制槽

添加订阅会在发布端同时添加一个slot,名称格式为 pgl_<db-name>_provider_<subscription_name>

pgl_mydb_provider_hptest

发布端查看复制槽不知道是哪个订阅的,可以在订阅端查看订阅状态

1
2
3
4
5
6
7
8
9
lfo-prod=> select * from pg_replication_slots;

slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
----------------------------------------------+------------------+-----------+--------+-----------------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
pgl_lfo_backup_provider_lfoe330b6c | pglogical_output | logical | 16391 | lfo-prod | f | f | | | 3730908 | 7A/B0003240 | 7A/DE006148 | extended |
pgl_lfo_review_provider_lforeview | pglogical_output | logical | 20575 | lfo-review | f | t | 121 | | 3767215 | 7B/DC006958 | 7B/DC006990 | reserved |
pgl_lfo_backup_provider_lfo6cb441d | pglogical_output | logical | 16391 | lfo-prod | f | t | 123 | | 3767215 | 7B/DC006958 | 7B/DC006990 | reserved |
(3 行记录)

  • pgl_lfo_backup_provider_lfoe330b6c 和 pgl_lfo_backup_provider_lfo6cb441d 不确定是哪个订阅的
1
2
3
4
5
6
7
lfo-backup=> select * FROM pglogical.show_subscription_status();

subscription_name | status | provider_node | provider_dsn | slot_name | replication_sets | forward_origins
----------------------+-------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+------------------+-----------------
lfobackup_delay | replicating | provider | host=pgm-wz9298od6hkag64s168200.pg.rds.aliyuncs.com port=5432 dbname=lfo-prod user=haopai_prod password=Sda$39fd(87*(&-3^%!&S=)H*&^12+@ | pgl_lfo_backup_provider_lfoe330b6c | {default} | {all}
lfobackup_now | replicating | provider | host=pgm-wz9298od6hkag64s168200.pg.rds.aliyuncs.com port=5432 dbname=lfo-prod user=haopai_prod password=Sda$39fd(87*(&-3^%!&S=)H*&^12+@ | pgl_lfo_backup_provider_lfo6cb441d | {midnight} | {all}

  • 可以看到

    槽 330b6c 是名为 lfobackup_delay的订阅,订阅了 {default} 复制集;

    槽6cb441d是 名为 lfobackup_now 的订阅,订阅了 {midnight} 复制集

查看

1
select * from pg_replication_slots;
  • active

    • f :这个槽当前未被使用,需要删除
    • t:这个槽当前正在被使用
  • active_pid

    如果槽当前正在被使用,则记录使用这个槽的会话的进程 ID。如果不活动则为NULL

  • wal_status

    • reserved 意味着wal文件小于 max_wal_size
    • extended意味着wal文件超出max_wal_size,但文件仍保留,通过复制槽或wal_keep_size
    • unreserved意味着该插槽不再保留所需的 WAL 文件,并且将在下一个检查点删除其中一些文件。 此状态可以返回到reserved或extended
    • lost意味着某些需要的 WAL 文件已被删除,并且此插槽不再可用

删除

1
2
select pg_drop_replication_slot('slot_name');
-- select pg_drop_replication_slot('pgl_lfo_backup_provider_lfo6cb441d');

inactive slot

当连接到槽的复制客户端断开连接时,复制槽被标记为 inactive。inactive 的复制槽会导致 WAL 文件被保留,因为它们必须在客户端重新连接并且槽变为 active 时发送到客户端。

通常不活动的复制槽是由于备份客户端被删除、从属服务器被关闭、升级、故障转移等造成的

副作用

  1. 会保留wal日志,占用磁盘
  2. 保留旧的事务
  3. 阻止 vacuum 的执行

查找 inactive slot

1
SELECT count(*) FROM pg_replication_slots WHERE NOT active;

参考

[1] Inactive Replication Slot: the Butterfly Effect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
lfo-prod=> select pg_drop_replication_slot('pgl_mydb_provider_hptest');
pg_drop_replication_slot
--------------------------

(1 行记录)

-- 如果删除失败
lfo-prod=> select pg_drop_replication_slot('pgl_lfo_backup_provider_lfoe330b6c_c59b23c1');
ERROR: replication slot "pgl_lfo_backup_provider_lfoe330b6c_c59b23c1" is active for PID 6693
-- 则先删除进程
lfo-prod=> select pg_terminate_backend(6693);
pg_terminate_backend
----------------------
t
(1 行记录)
-- 再删除就可以了
lfo-prod=> select pg_drop_replication_slot('pgl_lfo_backup_provider_lfoe330b6c_c59b23c1');
pg_drop_replication_slot
--------------------------

(1 行记录)

如果两个库表结构不一致会导致逻辑订阅失败,wal 日志堆积,此时需要删除 active 为 f 的复制槽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SELECT pglogical.drop_subscription('subscription1');

-- subcriber 中执行
mydb=# SELECT pglogical.drop_subscription('hptest'); -- 删除订阅
警告: could not drop slot "pgl_mydb_provider_hptest" on provider, you will probably have to drop it manually
drop_subscription
-------------------
1
(1 行记录)

-- provider 中执行
lfo-test=> select * from pg_replication_slots; -- 查看复制槽
lfo-test=> select pg_drop_replication_slot('pgl_mydb_provider_hptest');
pg_drop_replication_slot
--------------------------

(1 行记录)

序列数

同步

在 provider 上执行

1
2
3
4
5
6
7
8
9
10
11
-- 查看所有序列表的状态
select * from pglogical.sequence_state;
seqoid | cache_size | last_value
--------+------------+------------
29127 | 1000 | 3031

-- 同步所有序列表
select pglogical.synchronize_sequence( seqoid ) from pglogical.sequence_state;

-- 同步单个序列表
select pglogical.synchronize_sequence('public.test_id_seq');

序列数定期同步,测试发现实际规则是根据某个序列表 select * from staff_id_seq; 的 last_value 值大于等于 select * from pglogical.sequence_state; 中的 last_value 值时,自动触发同步,延迟 [apply_delay] 时间后,复制到订阅端。并且订阅端序列表的 last_value 值等于发布端该序列表的 last_value 值加1000 [select * from pglogical.sequence_state;cache_size 值]

参考

https://github.com/2ndQuadrant/pglogical/issues/136#issuecomment-608234535

设置

1
select setval('staff_id_seq',1);

DDL同步

replicate_ddl_command

  • 在 lfo-prod 执行 DDL 语句时需要通过 pglogical 提供的 replicate_ddl_command 命令执行,会同步到备份数据库。否则备份数据库表结构不一致会导致 逻辑复制 失败,wal 日志堆积,占用磁盘

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    select pglogical.replicate_ddl_command('create sequence public.customer_id_seq
    minvalue 1
    maxvalue 9223372036854775807
    start with 1
    increment by 1
    cache 1
    ;');

    select pglogical.replicate_ddl_command('create table public.customer (
    "ID" bigint NOT NULL PRIMARY KEY default nextval(''public.customer_id_seq''), -- 这里是双单引号
    "TYPE" character varying(50) NOT NULL
    );');
  • 执行完 DDL 之后,需要将表添加到复制集,根据表插入的时间,选择 default 或 midnight 复制集

    1
    2
    select * from pglogical.replication_set_add_sequence('midnight','"MESSAGE_ID_seq"');
    select pglogical.replication_set_add_table('midnight','"MESSAGE"');

其他示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
select pglogical.replicate_ddl_command('create sequence public.test_id_seq
minvalue 1
maxvalue 9223372036854775807
start with 1
increment by 1
cache 1
;');

select pglogical.replicate_ddl_command('create table public.test (
"ID" bigint NOT NULL PRIMARY KEY default nextval(''public.test_id_seq''),
"TYPE" character varying(50) NOT NULL
);');


select pglogical.replicate_ddl_command('create sequence public.customer_id_seq
minvalue 1
maxvalue 9223372036854775807
start with 1
increment by 1
cache 1
;');

select pglogical.replicate_ddl_command('create table public.customer (
"ID" bigint NOT NULL PRIMARY KEY default nextval(''public.customer_id_seq''),
"TYPE" character varying(50) NOT NULL
);');


select pglogical.replicate_ddl_command('create sequence public.monster_id_seq
minvalue 1
maxvalue 9223372036854775807
start with 1
increment by 1
cache 1
;','{my_set}');

select pglogical.replicate_ddl_command('create table public.monster (
"ID" bigint NOT NULL PRIMARY KEY default nextval(''public.monster_id_seq''),
"TYPE" character varying(50) NOT NULL
);','{my_set}');

共享库预加载

Postgresql共享库预加载(Shared Library Preloading) PostgreSQL支持通过动态库的方式扩展PG的功能,pg在使用这些功能时需要预加载相关的共享库。有几种设置可用于将共享库预加载到服务器中

local_preload_libraries (string)

  • 用户建立连接时加载,通常使用客户端上的 PGOPOPS 环境变量或使用 ALTER 角色 SET 设置此参数。
  • 任何用户都可以设置此选项,因此限定只能加载$libdir/plugins下面的so文件。可以显示的指定目录,如$libdir/plugins/passwordcheck;或者只指定库的名字,如passwordcheck。其会自动到$libdir/plugins/中搜索
1
2
3
4
postgres=> alter role test set local_preload_libraries=passwordcheck;
ALTER ROLE
postgres=> alter role test set local_preload_libraries='$libdir/plugins/passwordcheck';
ALTER ROLE

session_preload_libraries (string)

  • 用户建立连接时加载这个参数只允许超级用户修改
  • 能动态加载所有目录下面的so文件,如果未指定相对目录,自动到dynamic_library_path指定的目录中搜索so。

shared_preload_libraries (string)

  • 数据库启动时加载,配置shared_preload_libraries参数,必须重启数据库。
1
2
postgres=# alter system set shared_preload_libraries=pg_pathman, pg_stat_statements, passwordcheck;
ALTER SYSTEM
  • 在连接开始时加载一个或多个共享库,用逗号分隔列表。条目之间的空白会被忽略,如果要在名称中包含空格或逗号,库名需要加双引号。此参数只在服务器启动时生效。如果找不到指定的库,服务器无法启动。
  • 多个参数不要放在单引号中,如
1
alter system set shared_preload_libraries='pg_pathman,pg_stat_statements';
  • $libdir路径通过以下命令查看
1
2
3
[pg@pg ~]$ pg_config |grep LIBDIR
LIBDIR = /opt/postgres/lib
PKGLIBDIR = /opt/postgres/lib

版本

2.4.1

“snapshot still active” warnings

WARNING: snapshot 0x199ed48 still active