Fix logical apply worker reporting of flush_lsn wrt sync replication.

It should take syncrep flush_lsn into account because WAL before it on endpoint
restart is lost, which makes replication miss some data if slot had already been
advanced too far. This commit adds test reproducing the issue and bumps
vendor/postgres to commit with the actual fix.
This commit is contained in:
Arseny Sher
2024-08-08 20:01:55 +03:00
committed by Arseny Sher
parent 4476caf670
commit a4eea5025c
8 changed files with 119 additions and 12 deletions

View File

@@ -824,11 +824,12 @@ impl Endpoint {
// cleanup work to do after postgres stops, like syncing safekeepers,
// etc.
//
// If destroying, send it SIGTERM before waiting. Sometimes we do *not*
// want this cleanup: tests intentionally do stop when majority of
// safekeepers is down, so sync-safekeepers would hang otherwise. This
// could be a separate flag though.
self.wait_for_compute_ctl_to_exit(destroy)?;
// If destroying or stop mode is immediate, send it SIGTERM before
// waiting. Sometimes we do *not* want this cleanup: tests intentionally
// do stop when majority of safekeepers is down, so sync-safekeepers
// would hang otherwise. This could be a separate flag though.
let send_sigterm = destroy || mode == "immediate";
self.wait_for_compute_ctl_to_exit(send_sigterm)?;
if destroy {
println!(
"Destroying postgres data directory '{}'",

View File

@@ -20,6 +20,7 @@
#include "utils/guc.h"
#include "postmaster/interrupt.h"
#include "neon.h"
#include "neon_walreader.h"
#include "walproposer.h"
@@ -181,6 +182,13 @@ NeonWALReadSegmentClose(XLogReaderState *xlogreader)
void
NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
{
/*
* If safekeepers are not configured, assume we don't need neon_walreader,
* i.e. running neon fork locally.
*/
if (wal_acceptors_list[0] == '\0')
return;
if (!wal_reader)
{
XLogRecPtr epochStartLsn = pg_atomic_read_u64(&GetWalpropShmemState()->propEpochStartLsn);

View File

@@ -4893,7 +4893,7 @@ def check_restored_datadir_content(
assert (mismatch, error) == ([], [])
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:
def logical_replication_sync(subscriber: PgProtocol, publisher: PgProtocol) -> Lsn:
"""Wait logical replication subscriber to sync with publisher."""
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
while True:

View File

@@ -4,11 +4,13 @@ from random import choice
from string import ascii_lowercase
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
NeonEnv,
NeonEnvBuilder,
PgProtocol,
logical_replication_sync,
wait_for_last_flush_lsn,
)
@@ -524,3 +526,90 @@ def test_replication_shutdown(neon_simple_env: NeonEnv):
assert [r[0] for r in res] == [10, 20, 30, 40]
wait_until(10, 0.5, check_that_changes_propagated)
def logical_replication_wait_flush_lsn_sync(publisher: PgProtocol) -> Lsn:
"""
Wait for logical replication subscriber reported flush_lsn to reach
pg_current_wal_flush_lsn on publisher. Note that this is somewhat unreliable
because for some WAL records like vacuum subscriber won't get any data at
all.
"""
publisher_flush_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
def check_caughtup():
res = publisher.safe_psql(
"""
select sent_lsn, flush_lsn, pg_current_wal_flush_lsn() from pg_stat_replication sr, pg_replication_slots s
where s.active_pid = sr.pid and s.slot_type = 'logical';
"""
)[0]
sent_lsn, flush_lsn, curr_publisher_flush_lsn = Lsn(res[0]), Lsn(res[1]), Lsn(res[2])
log.info(
f"sent_lsn={sent_lsn}, flush_lsn={flush_lsn}, publisher_flush_lsn={curr_publisher_flush_lsn}, waiting flush_lsn to reach {publisher_flush_lsn}"
)
assert flush_lsn >= publisher_flush_lsn
wait_until(30, 0.5, check_caughtup)
return publisher_flush_lsn
# Test that subscriber takes into account quorum committed flush_lsn in
# flush_lsn reporting to publisher. Without this, it may ack too far, losing
# data on restart because publisher advances START_REPLICATION position to the
# confirmed_flush_lsn of the slot.
def test_subscriber_synchronous_commit(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
# use vanilla as publisher to allow writes on it when safekeeper is down
vanilla_pg.configure(
[
"wal_level = 'logical'",
# neon fork uses custom WAL records which won't work without extension installed with obscure
# ERROR: resource manager with ID 134 not registered
# error.
"shared_preload_libraries = 'neon'",
]
)
vanilla_pg.start()
vanilla_pg.safe_psql("create extension neon;")
env.neon_cli.create_branch("subscriber")
sub = env.endpoints.create("subscriber")
sub.start()
with vanilla_pg.cursor() as pcur:
with sub.cursor() as scur:
pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
pcur.execute("CREATE PUBLICATION pub FOR TABLE t")
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
pub_connstr = vanilla_pg.connstr().replace("'", "''")
log.info(f"pub connstr is {pub_connstr}, subscriber connstr {sub.connstr()}")
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_connstr}' PUBLICATION pub with (synchronous_commit=off)"
scur.execute(query)
time.sleep(2) # let initial table sync complete
# stop safekeeper so it won't get any data
for sk in env.safekeepers:
sk.stop()
# and insert to publisher
with vanilla_pg.cursor() as pcur:
for i in range(0, 1000):
pcur.execute("INSERT into t values (%s, random()*100000)", (i,))
# wait until sub receives all data
logical_replication_sync(sub, vanilla_pg)
# Update confirmed_flush_lsn of the slot. If subscriber ack'ed recevied data
# as flushed we'll now lose it if subscriber restars. That's why
# logical_replication_wait_flush_lsn_sync is expected to hang while
# safekeeper is down.
vanilla_pg.safe_psql("checkpoint;")
assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000
# restart subscriber and ensure it can catch up lost tail again
sub.stop(mode="immediate")
for sk in env.safekeepers:
sk.start()
sub.start()
log.info("waiting for sync after restart")
logical_replication_wait_flush_lsn_sync(vanilla_pg)
assert sub.safe_psql_scalar("SELECT count(*) FROM t") == 1000

15
vendor/revisions.json vendored
View File

@@ -1,5 +1,14 @@
{
"v16": ["16.3", "5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3"],
"v15": ["15.7", "9eba7dd382606ffca43aca865f337ec21bcdac73"],
"v14": ["14.12", "7bbe834c8c2dc37802eca8484311599bc47341f6"]
"v16": [
"16.3",
"6434b1499b11ed97dccea5618a055034b83b8e2f"
],
"v15": [
"15.7",
"47c8d462d169367c8979ce628a523be2d94b46be"
],
"v14": [
"14.12",
"ae07734e0ff72759ab425fc8f625d4c1ecb15a50"
]
}