From a4eea5025c70d646f5a94481590177af3dfe7491 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 8 Aug 2024 20:01:55 +0300 Subject: [PATCH] Fix logical apply worker reporting of flush_lsn wrt sync replication. It should take syncrep flush_lsn into account because WAL before it on endpoint restart is lost, which makes replication miss some data if slot had already been advanced too far. This commit adds test reproducing the issue and bumps vendor/postgres to commit with the actual fix. --- control_plane/src/endpoint.rs | 11 +-- pgxn/neon/walsender_hooks.c | 8 ++ test_runner/fixtures/neon_fixtures.py | 2 +- .../regress/test_logical_replication.py | 89 +++++++++++++++++++ vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/revisions.json | 15 +++- 8 files changed, 119 insertions(+), 12 deletions(-) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index f9bb2da7e7..9f879c4b08 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -824,11 +824,12 @@ impl Endpoint { // cleanup work to do after postgres stops, like syncing safekeepers, // etc. // - // If destroying, send it SIGTERM before waiting. Sometimes we do *not* - // want this cleanup: tests intentionally do stop when majority of - // safekeepers is down, so sync-safekeepers would hang otherwise. This - // could be a separate flag though. - self.wait_for_compute_ctl_to_exit(destroy)?; + // If destroying or stop mode is immediate, send it SIGTERM before + // waiting. Sometimes we do *not* want this cleanup: tests intentionally + // do stop when majority of safekeepers is down, so sync-safekeepers + // would hang otherwise. This could be a separate flag though. + let send_sigterm = destroy || mode == "immediate"; + self.wait_for_compute_ctl_to_exit(send_sigterm)?; if destroy { println!( "Destroying postgres data directory '{}'", diff --git a/pgxn/neon/walsender_hooks.c b/pgxn/neon/walsender_hooks.c index 8f8d1dfc01..bd3856e9d9 100644 --- a/pgxn/neon/walsender_hooks.c +++ b/pgxn/neon/walsender_hooks.c @@ -20,6 +20,7 @@ #include "utils/guc.h" #include "postmaster/interrupt.h" +#include "neon.h" #include "neon_walreader.h" #include "walproposer.h" @@ -181,6 +182,13 @@ NeonWALReadSegmentClose(XLogReaderState *xlogreader) void NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr) { + /* + * If safekeepers are not configured, assume we don't need neon_walreader, + * i.e. running neon fork locally. + */ + if (wal_acceptors_list[0] == '\0') + return; + if (!wal_reader) { XLogRecPtr epochStartLsn = pg_atomic_read_u64(&GetWalpropShmemState()->propEpochStartLsn); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 844a23d327..4374e74a41 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4893,7 +4893,7 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) -def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn: +def logical_replication_sync(subscriber: PgProtocol, publisher: PgProtocol) -> Lsn: """Wait logical replication subscriber to sync with publisher.""" publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) while True: diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 66afe9ddfd..5a5d369a11 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -4,11 +4,13 @@ from random import choice from string import ascii_lowercase import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log from fixtures.neon_fixtures import ( AuxFileStore, NeonEnv, NeonEnvBuilder, + PgProtocol, logical_replication_sync, wait_for_last_flush_lsn, ) @@ -524,3 +526,90 @@ def test_replication_shutdown(neon_simple_env: NeonEnv): assert [r[0] for r in res] == [10, 20, 30, 40] wait_until(10, 0.5, check_that_changes_propagated) + + +def logical_replication_wait_flush_lsn_sync(publisher: PgProtocol) -> Lsn: + """ + Wait for logical replication subscriber reported flush_lsn to reach + pg_current_wal_flush_lsn on publisher. Note that this is somewhat unreliable + because for some WAL records like vacuum subscriber won't get any data at + all. + """ + publisher_flush_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + + def check_caughtup(): + res = publisher.safe_psql( + """ +select sent_lsn, flush_lsn, pg_current_wal_flush_lsn() from pg_stat_replication sr, pg_replication_slots s + where s.active_pid = sr.pid and s.slot_type = 'logical'; + """ + )[0] + sent_lsn, flush_lsn, curr_publisher_flush_lsn = Lsn(res[0]), Lsn(res[1]), Lsn(res[2]) + log.info( + f"sent_lsn={sent_lsn}, flush_lsn={flush_lsn}, publisher_flush_lsn={curr_publisher_flush_lsn}, waiting flush_lsn to reach {publisher_flush_lsn}" + ) + assert flush_lsn >= publisher_flush_lsn + + wait_until(30, 0.5, check_caughtup) + return publisher_flush_lsn + + +# Test that subscriber takes into account quorum committed flush_lsn in +# flush_lsn reporting to publisher. Without this, it may ack too far, losing +# data on restart because publisher advances START_REPLICATION position to the +# confirmed_flush_lsn of the slot. +def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + # use vanilla as publisher to allow writes on it when safekeeper is down + vanilla_pg.configure( + [ + "wal_level = 'logical'", + # neon fork uses custom WAL records which won't work without extension installed with obscure + # ERROR: resource manager with ID 134 not registered + # error. + "shared_preload_libraries = 'neon'", + ] + ) + vanilla_pg.start() + vanilla_pg.safe_psql("create extension neon;") + + env.neon_cli.create_branch("subscriber") + sub = env.endpoints.create("subscriber") + sub.start() + + with vanilla_pg.cursor() as pcur: + with sub.cursor() as scur: + pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)") + pcur.execute("CREATE PUBLICATION pub FOR TABLE t") + scur.execute("CREATE TABLE t (pk integer primary key, sk integer)") + + pub_connstr = vanilla_pg.connstr().replace("'", "''") + log.info(f"pub connstr is {pub_connstr}, subscriber connstr {sub.connstr()}") + query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_connstr}' PUBLICATION pub with (synchronous_commit=off)" + scur.execute(query) + time.sleep(2) # let initial table sync complete + + # stop safekeeper so it won't get any data + for sk in env.safekeepers: + sk.stop() + # and insert to publisher + with vanilla_pg.cursor() as pcur: + for i in range(0, 1000): + pcur.execute("INSERT into t values (%s, random()*100000)", (i,)) + # wait until sub receives all data + logical_replication_sync(sub, vanilla_pg) + # Update confirmed_flush_lsn of the slot. If subscriber ack'ed recevied data + # as flushed we'll now lose it if subscriber restars. That's why + # logical_replication_wait_flush_lsn_sync is expected to hang while + # safekeeper is down. + vanilla_pg.safe_psql("checkpoint;") + assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000 + + # restart subscriber and ensure it can catch up lost tail again + sub.stop(mode="immediate") + for sk in env.safekeepers: + sk.start() + sub.start() + log.info("waiting for sync after restart") + logical_replication_wait_flush_lsn_sync(vanilla_pg) + assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000 diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 7bbe834c8c..ae07734e0f 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 7bbe834c8c2dc37802eca8484311599bc47341f6 +Subproject commit ae07734e0ff72759ab425fc8f625d4c1ecb15a50 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 9eba7dd382..47c8d462d1 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 9eba7dd382606ffca43aca865f337ec21bcdac73 +Subproject commit 47c8d462d169367c8979ce628a523be2d94b46be diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 5377f5ed72..6434b1499b 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3 +Subproject commit 6434b1499b11ed97dccea5618a055034b83b8e2f diff --git a/vendor/revisions.json b/vendor/revisions.json index 570dfc1550..ab8b3b3c4f 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,14 @@ { - "v16": ["16.3", "5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3"], - "v15": ["15.7", "9eba7dd382606ffca43aca865f337ec21bcdac73"], - "v14": ["14.12", "7bbe834c8c2dc37802eca8484311599bc47341f6"] + "v16": [ + "16.3", + "6434b1499b11ed97dccea5618a055034b83b8e2f" + ], + "v15": [ + "15.7", + "47c8d462d169367c8979ce628a523be2d94b46be" + ], + "v14": [ + "14.12", + "ae07734e0ff72759ab425fc8f625d4c1ecb15a50" + ] }