From 45f5dfc685d973020333a482bbcf25ba84a935e0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 10 Jul 2025 12:29:15 +0000 Subject: [PATCH] extend test_hot_standby_gc to demonstrate that it doesn't work without hot_standby_feedback The bug described in - https://github.com/neondatabase/neon/issues/12168 is not covered by `test_hot_standby_gc[pause_apply=True]`. The reason is that the first standby reply _does_ trigger an update of the aggregate, but subsequent standby replies don't. Only hot_standby_feedback messaages do. In the test, that first reply sets standby_horizon to a low value X, thereby inhibiting gc, thereby making the test pass. However, the aggregate standby_horizon pushed by SKs remains at that low value X from here on. Actually, uh oh, this means standby_horizon is _kept_ at X!? And thus, if the PS receives re-push of low value X before the next GC, it will continue to limit GC to below X, until we hit the hard-coded `MAX_ALLOWED_STANDBY_LAG`. This is worse than I thought: it means that the current code _is_ preventing GC _even_ for standbys _without_ `hot_standby_feedback`, if those standbys shut down before 10GiB of WAL has been written since they were launched. Turn some debug logging into info logging to make the case. Sigh. refs - https://github.com/neondatabase/neon/issues/12168 --- pageserver/src/tenant/timeline.rs | 2 +- .../walreceiver/connection_manager.rs | 2 +- safekeeper/src/send_wal.rs | 7 +- test_runner/regress/test_hot_standby.py | 70 +++++++++++++++++++ 4 files changed, 77 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a9bc0a060b..0953b7a31f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6633,7 +6633,7 @@ impl Timeline { const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG { new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff); - trace!("holding off GC for standby apply LSN {}", standby_horizon); + info!("holding off GC for standby apply LSN {}", standby_horizon); } else { warn!( "standby is lagging for more than {}MB, not holding gc for it", diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index aba94244a3..a694a3940c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -750,7 +750,7 @@ impl ConnectionManagerState { WALRECEIVER_BROKER_UPDATES.inc(); - trace!( + info!( "safekeeper info update: standby_horizon(cutoff)={}", timeline_update.standby_horizon ); diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 177e759db5..c456cf9a4e 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -220,7 +220,7 @@ impl WalSenders { fn record_standby_reply(self: &Arc, id: WalSenderId, reply: &StandbyReply) { let mut shared = self.mutex.lock(); let slot = shared.get_slot_mut(id); - debug!( + info!( "Record standby reply: ts={} apply_lsn={}", reply.reply_ts, reply.apply_lsn ); @@ -400,7 +400,10 @@ impl WalSendersShared { } } self.agg_standby_feedback = StandbyFeedback { - reply: reply_agg, + reply: { + info!(prev=%self.agg_standby_feedback.reply.apply_lsn, new=%reply_agg.apply_lsn, "updating agg_standby_feedback apply_lsn"); + reply_agg + }, hs_feedback: agg, }; } diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 1ff61ce8dc..6b120f386d 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -7,6 +7,7 @@ import time from functools import partial import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, @@ -163,6 +164,11 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): res = s_cur.fetchone() assert res is not None + s_cur.execute("SHOW hot_standby_feedback") + res = s_cur.fetchone() + assert res is not None + assert res[0] == "off" + s_cur.execute("SELECT COUNT(*) FROM test") res = s_cur.fetchone() assert res == (10000,) @@ -198,6 +204,70 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): res = s_cur.fetchone() assert res == (10000,) + if pause_apply: + s_cur.execute("SELECT pg_wal_replay_resume()") + + wait_replica_caughtup(primary, secondary) + + # Wait for standby horizon to catch up, then gc again. + # (When we switch to leases (LKB-88) we need to wait for leases to expir.e) + shards = tenant_get_shards(env, tenant_id, None) + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() + while True: + secondary_apply_lsn = Lsn( + secondary.safe_psql_scalar( + "SELECT pg_last_wal_replay_lsn()", log_query=False + ) + ) + standby_horizon_metric = client.get_metrics().query_one( + "pageserver_standby_horizon", + { + "tenant_id": str(tenant_shard_id.tenant_id), + "shard_id": str(tenant_shard_id.shard_index), + "timeline_id": str(timeline_id), + }, + ) + standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value)) + log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}") + if secondary_apply_lsn == standby_horizon_at_ps: + break + time.sleep(1) + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + client.timeline_gc(tenant_shard_id, timeline_id, 0) + + # Clear the cache in the standby, so that when we + # re-execute the query, it will make GetPage + # requests. This does not clear the last-written LSN cache + # so we still remember the LSNs of the pages. + secondary.clear_buffers(cursor=s_cur) + + if pause_apply: + s_cur.execute("SELECT pg_wal_replay_pause()") + + # Do other stuff on the primary, to advance the WAL + p_cur.execute("CREATE TABLE test3 AS SELECT generate_series(1, 1000000) AS g") + + # Run GC. The PITR interval is very small, so this advances the GC cutoff LSN + # very close to the primary's current insert LSN. + shards = tenant_get_shards(env, tenant_id, None) + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + client.timeline_gc(tenant_shard_id, timeline_id, 0) + + # Re-execute the query. The GetPage requests that this + # generates use old not_modified_since LSNs, older than + # the GC cutoff, but new request LSNs. (In protocol + # version 1 there was only one LSN, and this failed.) + log_replica_lag(primary, secondary) + s_cur.execute("SELECT COUNT(*) FROM test") + log_replica_lag(primary, secondary) + res = s_cur.fetchone() + assert res == (10000,) + def run_pgbench(connstr: str, pg_bin: PgBin): log.info(f"Start a pgbench workload on pg {connstr}")