mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Logical replication startup fixes (#5750)
## Problem See https://neondb.slack.com/archives/C04DGM6SMTM/p1698226491736459 ## Summary of changes Update WAL affected buffers when restoring WAL from safekeeper ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech> Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
committed by
GitHub
parent
cdcaa329bf
commit
5ceccdc7de
@@ -88,7 +88,7 @@ static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
|
|||||||
static void WalSndLoop(WalProposer *wp);
|
static void WalSndLoop(WalProposer *wp);
|
||||||
static void XLogBroadcastWalProposer(WalProposer *wp);
|
static void XLogBroadcastWalProposer(WalProposer *wp);
|
||||||
|
|
||||||
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
|
static void XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr);
|
||||||
static void XLogWalPropClose(XLogRecPtr recptr);
|
static void XLogWalPropClose(XLogRecPtr recptr);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@@ -1241,7 +1241,7 @@ WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XL
|
|||||||
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
|
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
|
||||||
|
|
||||||
/* write WAL to disk */
|
/* write WAL to disk */
|
||||||
XLogWalPropWrite(&buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
|
XLogWalPropWrite(sk->wp, &buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
|
||||||
|
|
||||||
ereport(DEBUG1,
|
ereport(DEBUG1,
|
||||||
(errmsg("Recover message %X/%X length %d",
|
(errmsg("Recover message %X/%X length %d",
|
||||||
@@ -1283,11 +1283,24 @@ static XLogSegNo walpropSegNo = 0;
|
|||||||
* Write XLOG data to disk.
|
* Write XLOG data to disk.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
|
XLogWalPropWrite(WalProposer *wp, char *buf, Size nbytes, XLogRecPtr recptr)
|
||||||
{
|
{
|
||||||
int startoff;
|
int startoff;
|
||||||
int byteswritten;
|
int byteswritten;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Apart from walproposer, basebackup LSN page is also written out by
|
||||||
|
* postgres itself which writes WAL only in pages, and in basebackup it is
|
||||||
|
* inherently dummy (only safekeepers have historic WAL). Update WAL buffers
|
||||||
|
* here to avoid dummy page overwriting correct one we download here. Ugly,
|
||||||
|
* but alternatives are about the same ugly. We won't need that if we switch
|
||||||
|
* to on-demand WAL download from safekeepers, without writing to disk.
|
||||||
|
*
|
||||||
|
* https://github.com/neondatabase/neon/issues/5749
|
||||||
|
*/
|
||||||
|
if (!wp->config->syncSafekeepers)
|
||||||
|
XLogUpdateWalBuffers(buf, recptr, nbytes);
|
||||||
|
|
||||||
while (nbytes > 0)
|
while (nbytes > 0)
|
||||||
{
|
{
|
||||||
int segbytes;
|
int segbytes;
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import (
|
from fixtures.neon_fixtures import (
|
||||||
NeonEnv,
|
NeonEnv,
|
||||||
logical_replication_sync,
|
logical_replication_sync,
|
||||||
wait_for_last_flush_lsn,
|
wait_for_last_flush_lsn,
|
||||||
)
|
)
|
||||||
|
from fixtures.types import Lsn
|
||||||
|
from fixtures.utils import query_scalar
|
||||||
|
|
||||||
|
|
||||||
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||||
@@ -147,3 +150,89 @@ COMMIT;
|
|||||||
endpoint.start()
|
endpoint.start()
|
||||||
# it must be gone (but walproposer slot still exists, hence 1)
|
# it must be gone (but walproposer slot still exists, hence 1)
|
||||||
assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1
|
assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1
|
||||||
|
|
||||||
|
|
||||||
|
# Test compute start at LSN page of which starts with contrecord
|
||||||
|
# https://github.com/neondatabase/neon/issues/5749
|
||||||
|
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
env.neon_cli.create_branch("init")
|
||||||
|
endpoint = env.endpoints.create_start("init")
|
||||||
|
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
|
||||||
|
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
|
||||||
|
|
||||||
|
cur = endpoint.connect().cursor()
|
||||||
|
cur.execute("create table t(key int, value text)")
|
||||||
|
cur.execute("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);")
|
||||||
|
cur.execute("insert into replication_example values (1, 2)")
|
||||||
|
cur.execute("create publication pub1 for table replication_example")
|
||||||
|
|
||||||
|
# now start subscriber
|
||||||
|
vanilla_pg.start()
|
||||||
|
vanilla_pg.safe_psql("create table t(pk integer primary key, value text)")
|
||||||
|
vanilla_pg.safe_psql("CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int);")
|
||||||
|
|
||||||
|
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||||
|
connstr = endpoint.connstr().replace("'", "''")
|
||||||
|
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||||
|
logical_replication_sync(vanilla_pg, endpoint)
|
||||||
|
vanilla_pg.stop()
|
||||||
|
|
||||||
|
with endpoint.cursor() as cur:
|
||||||
|
# measure how much space logical message takes. Sometimes first attempt
|
||||||
|
# creates huge message and then it stabilizes, have no idea why.
|
||||||
|
for _ in range(3):
|
||||||
|
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
|
||||||
|
log.info(f"current_lsn={lsn_before}")
|
||||||
|
# Non-transactional logical message doesn't write WAL, only XLogInsert's
|
||||||
|
# it, so use transactional. Which is a bit problematic as transactional
|
||||||
|
# necessitates commit record. Alternatively we can do smth like
|
||||||
|
# select neon_xlogflush(pg_current_wal_insert_lsn());
|
||||||
|
# but isn't much better + that particular call complains on 'xlog flush
|
||||||
|
# request 0/282C018 is not satisfied' as pg_current_wal_insert_lsn skips
|
||||||
|
# page headers.
|
||||||
|
payload = "blahblah"
|
||||||
|
cur.execute(f"select pg_logical_emit_message(true, 'pref', '{payload}')")
|
||||||
|
lsn_after_by_curr_wal_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
|
||||||
|
lsn_diff = lsn_after_by_curr_wal_lsn - lsn_before
|
||||||
|
logical_message_base = lsn_after_by_curr_wal_lsn - lsn_before - len(payload)
|
||||||
|
log.info(
|
||||||
|
f"before {lsn_before}, after {lsn_after_by_curr_wal_lsn}, lsn diff is {lsn_diff}, base {logical_message_base}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# and write logical message spanning exactly as we want
|
||||||
|
lsn_before = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
|
||||||
|
log.info(f"current_lsn={lsn_before}")
|
||||||
|
curr_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
|
||||||
|
offs = int(curr_lsn) % 8192
|
||||||
|
till_page = 8192 - offs
|
||||||
|
payload_len = (
|
||||||
|
till_page - logical_message_base - 8
|
||||||
|
) # not sure why 8 is here, it is deduced from experiments
|
||||||
|
log.info(f"current_lsn={curr_lsn}, offs {offs}, till_page {till_page}")
|
||||||
|
|
||||||
|
# payload_len above would go exactly till the page boundary; but we want contrecord, so make it slightly longer
|
||||||
|
payload_len += 8
|
||||||
|
|
||||||
|
cur.execute(f"select pg_logical_emit_message(true, 'pref', 'f{'a' * payload_len}')")
|
||||||
|
supposedly_contrecord_end = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
|
||||||
|
log.info(f"supposedly_page_boundary={supposedly_contrecord_end}")
|
||||||
|
# The calculations to hit the page boundary are very fuzzy, so just
|
||||||
|
# ignore test if we fail to reach it.
|
||||||
|
if not (int(supposedly_contrecord_end) % 8192 == 32):
|
||||||
|
pytest.skip("missed page boundary, bad luck")
|
||||||
|
|
||||||
|
cur.execute("insert into replication_example values (2, 3)")
|
||||||
|
|
||||||
|
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||||
|
endpoint.stop().start()
|
||||||
|
|
||||||
|
cur = endpoint.connect().cursor()
|
||||||
|
# this should flush current wal page
|
||||||
|
cur.execute("insert into replication_example values (3, 4)")
|
||||||
|
vanilla_pg.start()
|
||||||
|
logical_replication_sync(vanilla_pg, endpoint)
|
||||||
|
assert vanilla_pg.safe_psql(
|
||||||
|
"select sum(somedata) from replication_example"
|
||||||
|
) == endpoint.safe_psql("select sum(somedata) from replication_example")
|
||||||
|
|||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 6669a672ee...dd067cf656
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: ab67ab9635...bc88f53931
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 550ffa6495...763000f1d0
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"postgres-v16": "550ffa6495a5dc62fccc3a8b449386633758680b",
|
"postgres-v16": "763000f1d0873b827829c41f2f6f799ffc0de55c",
|
||||||
"postgres-v15": "ab67ab96355d61e9d0218630be4aa7db53bf83e7",
|
"postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a",
|
||||||
"postgres-v14": "6669a672ee14ab2c09d44c4552f9a13fad3afc10"
|
"postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user