From 5ceccdc7def3d4bdd33b32d5b10a8f4e148ad9af Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 3 Nov 2023 18:40:27 +0200 Subject: [PATCH] Logical replication startup fixes (#5750) ## Problem See https://neondb.slack.com/archives/C04DGM6SMTM/p1698226491736459 ## Summary of changes Update WAL affected buffers when restoring WAL from safekeeper ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik Co-authored-by: Arseny Sher --- pgxn/neon/walproposer_pg.c | 19 +++- .../regress/test_logical_replication.py | 89 +++++++++++++++++++ vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/revisions.json | 6 +- 6 files changed, 111 insertions(+), 9 deletions(-) diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 865f91165b..f83a08d407 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -88,7 +88,7 @@ static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd); static void WalSndLoop(WalProposer *wp); static void XLogBroadcastWalProposer(WalProposer *wp); -static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalPropClose(XLogRecPtr recptr); static void @@ -1241,7 +1241,7 @@ WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XL rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE; /* write WAL to disk */ - XLogWalPropWrite(&buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn); + XLogWalPropWrite(sk->wp, &buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn); ereport(DEBUG1, (errmsg("Recover message %X/%X length %d", @@ -1283,11 +1283,24 @@ static XLogSegNo walpropSegNo = 0; * Write XLOG data to disk. */ static void -XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr) +XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr) { int startoff; int byteswritten; + /* + * Apart from walproposer, basebackup LSN page is also written out by + * postgres itself which writes WAL only in pages, and in basebackup it is + * inherently dummy (only safekeepers have historic WAL). Update WAL buffers + * here to avoid dummy page overwriting correct one we download here. Ugly, + * but alternatives are about the same ugly. We won't need that if we switch + * to on-demand WAL download from safekeepers, without writing to disk. + * + * https://github.com/neondatabase/neon/issues/5749 + */ + if (!wp->config->syncSafekeepers) + XLogUpdateWalBuffers(buf, recptr, nbytes); + while (nbytes > 0) { int segbytes; diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py index 726e5e5def..d2d8d71e3f 100644 --- a/test_runner/regress/test_logical_replication.py +++ b/test_runner/regress/test_logical_replication.py @@ -1,11 +1,14 @@ import time +import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, logical_replication_sync, wait_for_last_flush_lsn, ) +from fixtures.types import Lsn +from fixtures.utils import query_scalar def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg): @@ -147,3 +150,89 @@ COMMIT; endpoint.start() # it must be gone (but walproposer slot still exists, hence 1) assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1 + + +# Test compute start at LSN page of which starts with contrecord +# https://github.com/neondatabase/neon/issues/5749 +def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + env.neon_cli.create_branch("init") + endpoint = env.endpoints.create_start("init") + tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0] + timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0] + + cur = endpoint.connect().cursor() + cur.execute("create table t(key int, value text)") + cur.execute("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);") + cur.execute("insert into replication_example values (1, 2)") + cur.execute("create publication pub1 for table replication_example") + + # now start subscriber + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, value text)") + vanilla_pg.safe_psql("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);") + + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + connstr = endpoint.connstr().replace("'", "''") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + logical_replication_sync(vanilla_pg, endpoint) + vanilla_pg.stop() + + with endpoint.cursor() as cur: + # measure how much space logical message takes. Sometimes first attempt + # creates huge message and then it stabilizes, have no idea why. + for _ in range(3): + lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + log.info(f"current_lsn={lsn_before}") + # Non-transactional logical message doesn't write WAL, only XLogInsert's + # it, so use transactional. Which is a bit problematic as transactional + # necessitates commit record. Alternatively we can do smth like + # select neon_xlogflush(pg_current_wal_insert_lsn()); + # but isn't much better + that particular call complains on 'xlog flush + # request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips + # page headers. + payload = "blahblah" + cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')") + lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before + logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload) + log.info( + f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}" + ) + + # and write logical message spanning exactly as we want + lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + log.info(f"current_lsn={lsn_before}") + curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + offs = int(curr_lsn) % 8192 + till_page = 8192 - offs + payload_len = ( + till_page - logical_message_base - 8 + ) # not sure why 8 is here, it is deduced from experiments + log.info(f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}") + + # payload_len above would go exactly till the page boundary; but we want contrecord, so make it slightly longer + payload_len += 8 + + cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')") + supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()")) + log.info(f"supposedly_page_boundary={supposedly_contrecord_end}") + # The calculations to hit the page boundary are very fuzzy, so just + # ignore test if we fail to reach it. + if not (int(supposedly_contrecord_end) % 8192 == 32): + pytest.skip("missed page boundary, bad luck") + + cur.execute("insert into replication_example values (2, 3)") + + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + endpoint.stop().start() + + cur = endpoint.connect().cursor() + # this should flush current wal page + cur.execute("insert into replication_example values (3, 4)") + vanilla_pg.start() + logical_replication_sync(vanilla_pg, endpoint) + assert vanilla_pg.safe_psql( + "select sum(somedata) from replication_example" + ) == endpoint.safe_psql("select sum(somedata) from replication_example") diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 6669a672ee..dd067cf656 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 6669a672ee14ab2c09d44c4552f9a13fad3afc10 +Subproject commit dd067cf656f6810a25aca6025633d32d02c5085a diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index ab67ab9635..bc88f53931 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit ab67ab96355d61e9d0218630be4aa7db53bf83e7 +Subproject commit bc88f539312fcc4bb292ce94ae9db09ab6656e8a diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 550ffa6495..763000f1d0 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 550ffa6495a5dc62fccc3a8b449386633758680b +Subproject commit 763000f1d0873b827829c41f2f6f799ffc0de55c diff --git a/vendor/revisions.json b/vendor/revisions.json index 012fb14035..377357e131 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "550ffa6495a5dc62fccc3a8b449386633758680b", - "postgres-v15": "ab67ab96355d61e9d0218630be4aa7db53bf83e7", - "postgres-v14": "6669a672ee14ab2c09d44c4552f9a13fad3afc10" + "postgres-v16": "763000f1d0873b827829c41f2f6f799ffc0de55c", + "postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a", + "postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a" }