diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index ce5a1e411e..2fbc333075 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -50,6 +50,9 @@ pub struct SkTimelineInfo { pub safekeeper_connstr: Option, #[serde(default)] pub http_connstr: Option, + // Minimum of all active RO replicas flush LSN + #[serde(default = "lsn_invalid")] + pub standby_horizon: Lsn, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7f2a41d90c..2c43c26359 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -269,6 +269,8 @@ pub struct Timeline { // Atomic would be more appropriate here. last_freeze_ts: RwLock, + pub(crate) standby_horizon: AtomicLsn, + // WAL redo manager. `None` only for broken tenants. walredo_mgr: Option>, @@ -2279,6 +2281,8 @@ impl Timeline { compaction_lock: tokio::sync::Mutex::default(), gc_lock: tokio::sync::Mutex::default(), + standby_horizon: AtomicLsn::new(0), + timeline_get_throttle: resources.timeline_get_throttle, aux_files: tokio::sync::Mutex::new(AuxFilesState { @@ -4844,7 +4848,29 @@ impl Timeline { (horizon_cutoff, pitr_cutoff, retain_lsns) }; - let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); + let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); + let standby_horizon = self.standby_horizon.load(); + // Hold GC for the standby, but as a safety guard do it only within some + // reasonable lag. + if standby_horizon != Lsn::INVALID { + if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) { + const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB + if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG { + new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff); + trace!("holding off GC for standby apply LSN {}", standby_horizon); + } else { + warn!( + "standby is lagging for more than {}MB, not holding gc for it", + MAX_ALLOWED_STANDBY_LAG / 1024 / 1024 + ) + } + } + } + + // Reset standby horizon to ignore it if it is not updated till next GC. + // It is an easy way to unset it when standby disappears without adding + // more conf options. + self.standby_horizon.store(Lsn::INVALID); let res = self .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 991e4ac045..a3c7adae44 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -705,6 +705,7 @@ impl ConnectionManagerState { commit_lsn: info.commit_lsn, safekeeper_connstr: info.safekeeper_connstr, availability_zone: info.availability_zone, + standby_horizon: info.standby_horizon, } } MessageType::SafekeeperDiscoveryResponse => { @@ -725,6 +726,17 @@ impl ConnectionManagerState { WALRECEIVER_BROKER_UPDATES.inc(); + trace!( + "safekeeper info update: standby_horizon(cutoff)={}", + timeline_update.standby_horizon + ); + if timeline_update.standby_horizon != 0 { + // ignore reports from safekeepers not connected to replicas + self.timeline + .standby_horizon + .store(Lsn(timeline_update.standby_horizon)); + } + let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); let old_entry = self.wal_stream_candidates.insert( new_safekeeper_id, @@ -1094,6 +1106,7 @@ mod tests { commit_lsn, safekeeper_connstr: safekeeper_connstr.to_owned(), availability_zone: None, + standby_horizon: 0, }, latest_update, } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index ea16ce450f..46a51438ea 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -186,6 +186,7 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc) -> Result< commit_lsn: sk_info.commit_lsn, safekeeper_connstr: sk_info.safekeeper_connstr, availability_zone: sk_info.availability_zone, + standby_horizon: 0, }; // note this is a blocking call diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 30d0081a47..808bb1e490 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -350,6 +350,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result Self { + StandbyFeedback { + reply: StandbyReply::empty(), + hs_feedback: HotStandbyFeedback::empty(), + } + } } /// WalSenders registry. Timeline holds it (wrapped in Arc). @@ -162,8 +171,8 @@ impl WalSenders { } /// Get aggregated hot standby feedback (we send it to compute). - pub fn get_hotstandby(self: &Arc) -> HotStandbyFeedback { - self.mutex.lock().agg_hs_feedback + pub fn get_hotstandby(self: &Arc) -> StandbyFeedback { + self.mutex.lock().agg_standby_feedback } /// Record new pageserver feedback, update aggregated values. @@ -184,6 +193,10 @@ impl WalSenders { fn record_standby_reply(self: &Arc, id: WalSenderId, reply: &StandbyReply) { let mut shared = self.mutex.lock(); let slot = shared.get_slot_mut(id); + debug!( + "Record standby reply: ts={} apply_lsn={}", + reply.reply_ts, reply.apply_lsn + ); match &mut slot.feedback { ReplicationFeedback::Standby(sf) => sf.reply = *reply, ReplicationFeedback::Pageserver(_) => { @@ -208,7 +221,7 @@ impl WalSenders { }) } } - shared.update_hs_feedback(); + shared.update_reply_feedback(); } /// Get remote_consistent_lsn reported by the pageserver. Returns None if @@ -226,13 +239,13 @@ impl WalSenders { fn unregister(self: &Arc, id: WalSenderId) { let mut shared = self.mutex.lock(); shared.slots[id] = None; - shared.update_hs_feedback(); + shared.update_reply_feedback(); } } struct WalSendersShared { // aggregated over all walsenders value - agg_hs_feedback: HotStandbyFeedback, + agg_standby_feedback: StandbyFeedback, // last feedback ever received from any pageserver, empty if none last_ps_feedback: PageserverFeedback, // total counter of pageserver feedbacks received @@ -243,7 +256,7 @@ struct WalSendersShared { impl WalSendersShared { fn new() -> Self { WalSendersShared { - agg_hs_feedback: HotStandbyFeedback::empty(), + agg_standby_feedback: StandbyFeedback::empty(), last_ps_feedback: PageserverFeedback::empty(), ps_feedback_counter: 0, slots: Vec::new(), @@ -260,10 +273,11 @@ impl WalSendersShared { self.slots[id].as_mut().expect("walsender doesn't exist") } - /// Update aggregated hot standy feedback. We just take min of valid xmins + /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins /// and ts. - fn update_hs_feedback(&mut self) { + fn update_reply_feedback(&mut self) { let mut agg = HotStandbyFeedback::empty(); + let mut reply_agg = StandbyReply::empty(); for ws_state in self.slots.iter().flatten() { if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback { let hs_feedback = standby_feedback.hs_feedback; @@ -276,7 +290,7 @@ impl WalSendersShared { } else { agg.xmin = hs_feedback.xmin; } - agg.ts = min(agg.ts, hs_feedback.ts); + agg.ts = max(agg.ts, hs_feedback.ts); } if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID { if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID { @@ -284,11 +298,43 @@ impl WalSendersShared { } else { agg.catalog_xmin = hs_feedback.catalog_xmin; } - agg.ts = min(agg.ts, hs_feedback.ts); + agg.ts = max(agg.ts, hs_feedback.ts); + } + let reply = standby_feedback.reply; + if reply.write_lsn != Lsn::INVALID { + if reply_agg.write_lsn != Lsn::INVALID { + reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn); + } else { + reply_agg.write_lsn = reply.write_lsn; + } + } + if reply.flush_lsn != Lsn::INVALID { + if reply_agg.flush_lsn != Lsn::INVALID { + reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn); + } else { + reply_agg.flush_lsn = reply.flush_lsn; + } + } + if reply.apply_lsn != Lsn::INVALID { + if reply_agg.apply_lsn != Lsn::INVALID { + reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn); + } else { + reply_agg.apply_lsn = reply.apply_lsn; + } + } + if reply.reply_ts != 0 { + if reply_agg.reply_ts != 0 { + reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts); + } else { + reply_agg.reply_ts = reply.reply_ts; + } } } } - self.agg_hs_feedback = agg; + self.agg_standby_feedback = StandbyFeedback { + reply: reply_agg, + hs_feedback: agg, + }; } } @@ -793,8 +839,11 @@ mod tests { fn test_hs_feedback_no_valid() { let mut wss = WalSendersShared::new(); push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID)); - wss.update_hs_feedback(); - assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID); + wss.update_reply_feedback(); + assert_eq!( + wss.agg_standby_feedback.hs_feedback.xmin, + INVALID_FULL_TRANSACTION_ID + ); } #[test] @@ -803,7 +852,7 @@ mod tests { push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID)); push_feedback(&mut wss, hs_feedback(1, 42)); push_feedback(&mut wss, hs_feedback(1, 64)); - wss.update_hs_feedback(); - assert_eq!(wss.agg_hs_feedback.xmin, 42); + wss.update_reply_feedback(); + assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42); } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 64f764f191..e97247dc7c 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -248,6 +248,7 @@ impl SharedState { &self, ttid: &TenantTimelineId, conf: &SafeKeeperConf, + standby_apply_lsn: Lsn, ) -> SafekeeperTimelineInfo { SafekeeperTimelineInfo { safekeeper_id: conf.my_id.0, @@ -270,6 +271,7 @@ impl SharedState { backup_lsn: self.sk.state.inmem.backup_lsn.0, local_start_lsn: self.sk.state.local_start_lsn.0, availability_zone: conf.availability_zone.clone(), + standby_horizon: standby_apply_lsn.0, } } @@ -663,7 +665,7 @@ impl Timeline { // if this is AppendResponse, fill in proper hot standby feedback. if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { - resp.hs_feedback = self.walsenders.get_hotstandby(); + resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback; } commit_lsn = shared_state.sk.state.inmem.commit_lsn; @@ -716,7 +718,8 @@ impl Timeline { /// Get safekeeper info for broadcasting to broker and other peers. pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { let shared_state = self.write_shared_state().await; - shared_state.get_safekeeper_info(&self.ttid, conf) + let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn; + shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn) } /// Update timeline state with peer safekeeper data. diff --git a/storage_broker/benches/rps.rs b/storage_broker/benches/rps.rs index d66cbefa45..1a6fb7fedf 100644 --- a/storage_broker/benches/rps.rs +++ b/storage_broker/benches/rps.rs @@ -147,6 +147,7 @@ async fn publish(client: Option, n_keys: u64) { http_connstr: "zenith-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, + standby_horizon: 0, }; counter += 1; yield info; diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index 7d1b63d23f..a420fd9c66 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -42,6 +42,7 @@ message SafekeeperTimelineInfo { uint64 remote_consistent_lsn = 7; uint64 peer_horizon_lsn = 8; uint64 local_start_lsn = 9; + uint64 standby_horizon = 14; // A connection string to use for WAL receiving. string safekeeper_connstr = 10; // HTTP endpoint connection string @@ -105,4 +106,6 @@ message SafekeeperDiscoveryResponse { string safekeeper_connstr = 4; // Availability zone of a safekeeper. optional string availability_zone = 5; + // Replica apply LSN + uint64 standby_horizon = 6; } diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index 8c88b61abc..0a4af543ab 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -736,6 +736,7 @@ mod tests { http_connstr: "neon-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, + standby_horizon: 0, }) } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 23f30804b4..41377e2db2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4288,6 +4288,17 @@ def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint): time.sleep(1) +def log_replica_lag(primary: Endpoint, secondary: Endpoint): + last_replay_lsn = Lsn( + secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False) + ) + primary_lsn = Lsn( + primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False) + ) + lag = primary_lsn - last_replay_lsn + log.info(f"primary_lsn={primary_lsn}, replay_lsn={last_replay_lsn}, lag={lag}") + + def wait_for_last_flush_lsn( env: NeonEnv, endpoint: Endpoint, diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 179cc273ec..31f436cb4c 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -1,9 +1,20 @@ import os import re +import threading import time +from functools import partial +import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, tenant_get_shards, wait_replica_caughtup +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + PgBin, + log_replica_lag, + tenant_get_shards, + wait_replica_caughtup, +) +from fixtures.utils import wait_until # Check for corrupted WAL messages which might otherwise go unnoticed if @@ -104,19 +115,28 @@ def test_2_replicas_start(neon_simple_env: NeonEnv): wait_replica_caughtup(primary, secondary2) -# We had an issue that a standby server made GetPage requests with an -# old LSN, based on the last-written LSN cache, to avoid waits in the -# pageserver. However, requesting a page with a very old LSN, such -# that the GC horizon has already advanced past it, results in an -# error from the pageserver: -# "Bad request: tried to request a page version that was garbage collected" +# Test two different scenarios related to gc of data needed by hot standby. # -# To avoid that, the compute<-> pageserver protocol was updated so -# that that the standby now sends two LSNs, the old last-written LSN -# and the current replay LSN. +# When pause_apply is False, standby is mostly caught up with the primary. +# However, in compute <-> pageserver protocol version 1 only one LSN had been +# sent to the pageserver in page request, and to avoid waits in the pageserver +# it was last-written LSN cache value. If page hasn't been updated for a long +# time that resulted in an error from the pageserver: "Bad request: tried to +# request a page version that was garbage collected". For primary this wasn't a +# problem because pageserver always bumped LSN to the newest one; for standy +# that would be incorrect since we might get page fresher then apply LSN. Hence, +# in protocol version v2 two LSNs were introduced: main request_lsn (apply LSN +# in case of standby) and not_modified_since which could be used as an +# optimization to avoid waiting. # # https://github.com/neondatabase/neon/issues/6211 -def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder): +# +# When pause_apply is True we model standby lagging behind primary (e.g. due to +# high max_standby_streaming_delay). To prevent pageserver from removing data +# still needed by the standby apply LSN is propagated in standby -> safekeepers +# -> broker -> pageserver flow so that pageserver could hold off gc for it. +@pytest.mark.parametrize("pause_apply", [False, True]) +def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool): tenant_conf = { # set PITR interval to be small, so we can do GC "pitr_interval": "0 s", @@ -160,6 +180,9 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder): # so we still remember the LSNs of the pages. s_cur.execute("SELECT clear_buffer_cache()") + if pause_apply: + s_cur.execute("SELECT pg_wal_replay_pause()") + # Do other stuff on the primary, to advance the WAL p_cur.execute("CREATE TABLE test2 AS SELECT generate_series(1, 1000000) AS g") @@ -176,6 +199,8 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder): # generates use old not_modified_since LSNs, older than # the GC cutoff, but new request LSNs. (In protocol # version 1 there was only one LSN, and this failed.) + log_replica_lag(primary, secondary) s_cur.execute("SELECT COUNT(*) FROM test") + log_replica_lag(primary, secondary) res = s_cur.fetchone() assert res[0] == 10000