diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a9bc0a060b..0953b7a31f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6633,7 +6633,7 @@ impl Timeline { const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG { new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff); - trace!("holding off GC for standby apply LSN {}", standby_horizon); + info!("holding off GC for standby apply LSN {}", standby_horizon); } else { warn!( "standby is lagging for more than {}MB, not holding gc for it", diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index aba94244a3..a694a3940c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -750,7 +750,7 @@ impl ConnectionManagerState { WALRECEIVER_BROKER_UPDATES.inc(); - trace!( + info!( "safekeeper info update: standby_horizon(cutoff)={}", timeline_update.standby_horizon ); diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 177e759db5..c456cf9a4e 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -220,7 +220,7 @@ impl WalSenders { fn record_standby_reply(self: &Arc, id: WalSenderId, reply: &StandbyReply) { let mut shared = self.mutex.lock(); let slot = shared.get_slot_mut(id); - debug!( + info!( "Record standby reply: ts={} apply_lsn={}", reply.reply_ts, reply.apply_lsn ); @@ -400,7 +400,10 @@ impl WalSendersShared { } } self.agg_standby_feedback = StandbyFeedback { - reply: reply_agg, + reply: { + info!(prev=%self.agg_standby_feedback.reply.apply_lsn, new=%reply_agg.apply_lsn, "updating agg_standby_feedback apply_lsn"); + reply_agg + }, hs_feedback: agg, }; } diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 1ff61ce8dc..6b120f386d 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -7,6 +7,7 @@ import time from functools import partial import pytest +from fixtures.common_types import Lsn from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, @@ -163,6 +164,11 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): res = s_cur.fetchone() assert res is not None + s_cur.execute("SHOW hot_standby_feedback") + res = s_cur.fetchone() + assert res is not None + assert res[0] == "off" + s_cur.execute("SELECT COUNT(*) FROM test") res = s_cur.fetchone() assert res == (10000,) @@ -198,6 +204,70 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): res = s_cur.fetchone() assert res == (10000,) + if pause_apply: + s_cur.execute("SELECT pg_wal_replay_resume()") + + wait_replica_caughtup(primary, secondary) + + # Wait for standby horizon to catch up, then gc again. + # (When we switch to leases (LKB-88) we need to wait for leases to expir.e) + shards = tenant_get_shards(env, tenant_id, None) + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() + while True: + secondary_apply_lsn = Lsn( + secondary.safe_psql_scalar( + "SELECT pg_last_wal_replay_lsn()", log_query=False + ) + ) + standby_horizon_metric = client.get_metrics().query_one( + "pageserver_standby_horizon", + { + "tenant_id": str(tenant_shard_id.tenant_id), + "shard_id": str(tenant_shard_id.shard_index), + "timeline_id": str(timeline_id), + }, + ) + standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value)) + log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}") + if secondary_apply_lsn == standby_horizon_at_ps: + break + time.sleep(1) + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + client.timeline_gc(tenant_shard_id, timeline_id, 0) + + # Clear the cache in the standby, so that when we + # re-execute the query, it will make GetPage + # requests. This does not clear the last-written LSN cache + # so we still remember the LSNs of the pages. + secondary.clear_buffers(cursor=s_cur) + + if pause_apply: + s_cur.execute("SELECT pg_wal_replay_pause()") + + # Do other stuff on the primary, to advance the WAL + p_cur.execute("CREATE TABLE test3 AS SELECT generate_series(1, 1000000) AS g") + + # Run GC. The PITR interval is very small, so this advances the GC cutoff LSN + # very close to the primary's current insert LSN. + shards = tenant_get_shards(env, tenant_id, None) + for tenant_shard_id, pageserver in shards: + client = pageserver.http_client() + client.timeline_checkpoint(tenant_shard_id, timeline_id) + client.timeline_compact(tenant_shard_id, timeline_id) + client.timeline_gc(tenant_shard_id, timeline_id, 0) + + # Re-execute the query. The GetPage requests that this + # generates use old not_modified_since LSNs, older than + # the GC cutoff, but new request LSNs. (In protocol + # version 1 there was only one LSN, and this failed.) + log_replica_lag(primary, secondary) + s_cur.execute("SELECT COUNT(*) FROM test") + log_replica_lag(primary, secondary) + res = s_cur.fetchone() + assert res == (10000,) + def run_pgbench(connstr: str, pg_bin: PgBin): log.info(f"Start a pgbench workload on pg {connstr}")