March 5, 2023

pglogical replication between PostgreSQL RDS/Aurora Postgres databases

Replication setup between Amazon Aurora Postgres/PostgreSQL RDS instances using pglogical


pglogical extension provides additional capabilities that aren’t available in native PostgreSQL.
Publish/subscribe model to migrate critical workloads.
Used to migrate data between PostgreSQL versions and replicate data between two or more PostgreSQL instances regardless of where those instances are located.
Used to migrate whole clusters, specific tables, and specific rows/columns.

Limitations of pglogical:
  • Needs superuser privileges
  • UNLOGGED/TEMPORARY tables/views can't be replicated
  • Primary key or unique key is mandatory
  • Sequence data is not replicated
  • Automatic DDL replication is not supported (other than TRUNCATE)
  • It is not recommended to add additional UNIQUE constraints other than the PRIMARY KEY
  • Unique constraints must not be deferrable
  • Large objects (LOBs) are not replicated
  • Foreign key constraints aren’t enforced during the replication process

Nodes - PostgreSQL instances
Providers and Subscribers - roles taken by Nodes
Replication Set - a collection of tables; used to control which tables in the database are replicated and which actions on those tables are replicated.

Step #1: Parameter changes, on both source/target databases

Cluster parameter group   -> some parameters are static, needs reboot of cluster
rds.logical_replication = 1
shared_preload_libraries = ***, pglogical
wal_level = 'logical'
max_worker_processes = 10    # one per database needed on provider node and one per node needed on subscriber node
max_replication_slots = 10  # one per node needed on provider node
max_wal_senders = 10        # one per node needed on provider node
track_commit_timestamp = on # needed for last/first update wins conflict resolution

Instance parameter group  -> parameter is static, needs reboot of instance
shared_preload_libraries = ***, pglogical

If debugging/logging more information required, please set below parameters.
log_min_messages=debug1
log_min_error_statement=debug1
log_error_verbosity=verbose
log_statement = 'all'

Step #2: Create user & extension, on both source/target databases

create user pguser with password 'pguser123';
grant rds_superuser to pguser;
 
create extension pglogical;

Step #3: Create Provider node (source database)

select pglogical.create_node(node_name := 'provider1', dsn := 'host=providerhost port=5432 dbname=db');
select pglogical.create_node(node_name :='node1', dsn := 'host=<db-endpoint> port=5432 dbname=<dbname> user=pguser password=pguser123);
select pglogical.create_node(node_name := 'node1', dsn := 'host= lab-pg14.xxxxxx.us-east-2.rds.amazonaws.com port=5432 sslmode=require dbname=cluster1 user=pguser password=xxxx);

select pglogical.drop_node(node_name := 'node1234');         # if needed to recreate node

Step #4: Create replication set and add tables to replication set, on source database

There are 3 preexisting replication sets named "default", "default_insert_only" and "ddl_sql".

select pglogical.create_replication_set(set_name name, replicate_insert bool, replicate_update bool, replicate_delete bool, replicate_truncate bool)
select pglogical.create_replication_set('shared_repl', true, true, true, true );
select pglogical.create_replication_set('test_repl_set');
select pglogical.create_replication_set(set_name := 'dml-replication-set', replicate_insert := TRUE, replicate_update := TRUE, replicate_delete := TRUE, replicate_truncate := FALSE);
select pglogical.alter_replication_set(set_name name, replicate_inserts bool, replicate_updates bool, replicate_deletes bool, replicate_truncate bool)

select pglogical.replication_set_add_all_tables('default', ARRAY['public']);      # adds all tables from schema 'public' to 'default' replication set
select pglogical.replication_set_add_all_tables('replication_set', ARRAY['catalog2'], true);
select pglogical.replication_set_add_all_tables('test_repl_set', ARRAY['schematest']);

select pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text[], row_filter text)
select pglogical.replication_set_add_table('default_insert_only', 'public.pgbench_history');
select pglogical.replication_set_add_table(set_name := 'replication_set', relation := 'test', synchronize_data := true );
select pglogical.replication_set_add_table(set_name := 'test_repl_set', relation := 'schematest.employees');

select pglogical.replication_set_add_all_sequences(set_name name, schema_names text[], synchronize_data boolean)
select pglogical.replication_set_add_all_sequences(set_name := 'default', schema_names := '{public}'::text[], synchronize_data := true)
select pglogical.replication_set_add_sequence(set_name name, relation regclass, synchronize_data boolean)
select pglogical.synchronize_sequence( seqoid ) from pglogical.sequence_state;
select seqoid::regclass from pglogical.sequence_state"

select pglogical.drop_replication_set('default');     # if needed to recreate replication set
select pglogical.replication_set_remove_table(set_name name, relation regclass)
select pglogical.replication_set_remove_table('default', 'listing');

Step #5: Create subscription, on target database

select pglogical.create_subscription(subscription_name := 'subscription1', provider_dsn := 'host=providerhost port=5432 dbname=db');
select pglogical.create_subscription(
    subscription_name := 'west_sub',
    provider_dsn := 'host=testdb.cluster-****.us-west-2.rds.amazonaws.com port=5433 dbname=postgres user=pguser password=pguser123',
    replication_sets := ARRAY['default'],
    synchronize_data := false,
    forward_origins := '{}' );
select pglogical.create_subscription(subscription_name := 'cluster2_sub', provider_dsn := 'host=lab-pg14.xxxxxx.us-east-1.rds.amazonaws.com port=5434 sslmode=require dbname=cluster1 user=pguser password=xxxx', replication_sets := ARRAY['default'], synchronize_data := true, forward_origins := '{}' );

select pglogical.alter_subscription_disable('subscription_name');
select pglogical.alter_subscription_disable(subscription_name) from pglogical.subscriptions where writer = 'name_of_writer';
select pglogical.alter_subscription_enable('west_to_east2_sub');
select pglogical.alter_subscription_enable('west_to_east2_sub', true);
select pglogical.drop_subscription('subscription1');

select pglogical.alter_subscription_skip_changes_upto('subscription_name','the_target_lsn');
select pglogical.wait_for_subscription_sync_complete('subscription1');

Step #6: Commands to check pglogical replication status

select pglogical.show_subscription_status('west_sub');
select pglogical.show_subscription_table(subscription_name name, relation regclass)
select pglogical.show_subscription_table('west_sub','listing');

select * from pglogical.subscription;
select subscription_name, status from pglogical.show_subscription_status() order by subscription_name;
select * from pglogical.show_subscription_status();

select * from pg_replication_slots;
select slot_name, slot_type, active from pg_replication_slots;
select slot_name from pg_replication_slots where active='f'; # inactive replications slots
select redo_lsn, slot_name, restart_lsn, round((redo_lsn-restart_lsn)/1024/1024/1024, 2) AS GB_behind from pg_control_checkpoint(), pg_replication_slots;

To remove replication slot, if required:
select pg_terminate_backend(23593);
select pg_drop_replication_slot('pgl_postgres_node_east_east_to_west_sub');

select * from pg_stat_replication_slots;
select slot_name, spill_txns, spill_count, spill_bytes from pg_stat_replication_slots;
select stream_txns, stream_count, stream_bytes from pg_stat_replication_slots;

select * from pg_stat_replication;
select pid, usename, application_name, client_addr, state, sync_state,
       pg_wal_lsn_diff(sent_lsn, write_lsn) as write_lag, pg_wal_lsn_diff(sent_lsn, flush_lsn) as flush_lag, 
       pg_wal_lsn_diff(sent_lsn, replay_lsn) as replay_lag, pg_wal_lsn_diff(sent_lsn, replay_lsn) as total_lag 
from pg_stat_replication;

select * from pglogical.depend;
select * from pglogical.node order by node_name;
select * from pglogical.local_node;
select * from pglogical.pglogical_node_info();

select * from pglogical.local_sync_status;
SELECT * FROM pglogical.local_sync_status WHERE NOT sync_status = 'r';
SELECT sync_status FROM pglogical.local_sync_status WHERE sync_nspname = 'public' AND sync_relname = 'example';
sync_kind
i initial sync
d data sync
f full sync
s structure sync

sync_status
\0: No sync
i: initial - Ask for sync
r: sync done
d: data sync
c: constraints sync
s: structure sync
u: Catching up
w: Table sync is waiting (to get OK from main thread)
y: sync finished at LSN

select * from pglogical.replication_set order by set_name;
select * from pglogical.replication_set_table;
select * from pglogical.replication_set_table where set_reloid::text like '%listing%';
select * from pglogical.queue;

select * from pg_publication;
select * from pg_publication_tables;
select * from pg_stat_subscription;

select pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass)
select pglogical.alter_subscription_resynchronize_table('east2_to_west2_sub', 'venue');


Related PostgreSQL Articles: 30 Best PostgreSQL DBA Interview Questions  pg_restore - PostgreSQL database/backup restore tool


No comments:

Post a Comment