From 8cfc5ade57066dd729b14b3964a461b4e18e9394 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 15 Jan 2024 15:20:50 +0200 Subject: [PATCH] Propagate repply_flush_lsn from SK to PS to prevent GC from collecting objects which may be still requested by replica --- libs/safekeeper_api/src/models.rs | 2 + pageserver/src/tenant/timeline.rs | 9 ++- .../walreceiver/connection_manager.rs | 6 +- safekeeper/src/http/routes.rs | 1 + safekeeper/src/send_wal.rs | 73 +++++++++++++++---- safekeeper/src/state.rs | 2 + safekeeper/src/timeline.rs | 6 +- storage_broker/proto/broker.proto | 1 + storage_broker/src/bin/storage_broker.rs | 1 + 9 files changed, 83 insertions(+), 18 deletions(-) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index ce5a1e411e..d7e8dc5f9e 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -50,6 +50,8 @@ pub struct SkTimelineInfo { pub safekeeper_connstr: Option, #[serde(default)] pub http_connstr: Option, + #[serde(default = "lsn_invalid")] + pub standby_flush_lsn: Lsn, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 735b8003b4..cc2ef9a5d0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -215,6 +215,8 @@ pub struct Timeline { // Atomic would be more appropriate here. last_freeze_ts: RwLock, + pub(crate) standby_flush_lsn: AtomicLsn, + // WAL redo manager. `None` only for broken tenants. walredo_mgr: Option>, @@ -1540,6 +1542,7 @@ impl Timeline { compaction_lock: tokio::sync::Mutex::default(), gc_lock: tokio::sync::Mutex::default(), + standby_flush_lsn: AtomicLsn::new(0), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -4198,7 +4201,11 @@ impl Timeline { (horizon_cutoff, pitr_cutoff, retain_lsns) }; - let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); + let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); + let standby_flush_lsn = self.standby_flush_lsn.load(); + if standby_flush_lsn != Lsn::INVALID && standby_flush_lsn < new_gc_cutoff { + new_gc_cutoff = standby_flush_lsn; + } let res = self .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index cf6dee114f..b39aadf4e7 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -553,6 +553,10 @@ impl ConnectionManagerState { fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) { WALRECEIVER_BROKER_UPDATES.inc(); + self.timeline + .standby_flush_lsn + .store(Lsn(timeline_update.standby_flush_lsn)); + let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); let old_entry = self.wal_stream_candidates.insert( new_safekeeper_id, @@ -561,7 +565,6 @@ impl ConnectionManagerState { latest_update: Utc::now().naive_utc(), }, ); - if old_entry.is_none() { info!("New SK node was added: {new_safekeeper_id}"); WALRECEIVER_CANDIDATES_ADDED.inc(); @@ -920,6 +923,7 @@ mod tests { remote_consistent_lsn: 0, peer_horizon_lsn: 0, local_start_lsn: 0, + standby_flush_lsn: 0, safekeeper_connstr: safekeeper_connstr.to_owned(), http_connstr: safekeeper_connstr.to_owned(), availability_zone: None, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index a0c0c7ca4c..b5e6727c07 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -350,6 +350,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result Self { + StandbyFeedback { + reply: StandbyReply::empty(), + hs_feedback: HotStandbyFeedback::empty(), + } + } } /// WalSenders registry. Timeline holds it (wrapped in Arc). @@ -142,9 +151,9 @@ impl WalSenders { } /// Get aggregated pageserver and hot standby feedback (we send them to compute). - pub fn get_feedbacks(self: &Arc) -> (PageserverFeedback, HotStandbyFeedback) { + pub fn get_feedbacks(self: &Arc) -> (PageserverFeedback, StandbyFeedback) { let shared = self.mutex.lock(); - (shared.agg_ps_feedback, shared.agg_hs_feedback) + (shared.agg_ps_feedback, shared.agg_standby_feedback) } /// Record new pageserver feedback, update aggregated values. @@ -182,7 +191,7 @@ impl WalSenders { }) } } - shared.update_hs_feedback(); + shared.update_reply_feedback(); } /// Get remote_consistent_lsn reported by the pageserver. Returns None if @@ -200,13 +209,13 @@ impl WalSenders { fn unregister(self: &Arc, id: WalSenderId) { let mut shared = self.mutex.lock(); shared.slots[id] = None; - shared.update_hs_feedback(); + shared.update_reply_feedback(); } } struct WalSendersShared { // aggregated over all walsenders value - agg_hs_feedback: HotStandbyFeedback, + agg_standby_feedback: StandbyFeedback, // aggregated over all walsenders value agg_ps_feedback: PageserverFeedback, slots: Vec>, @@ -215,7 +224,7 @@ struct WalSendersShared { impl WalSendersShared { fn new() -> Self { WalSendersShared { - agg_hs_feedback: HotStandbyFeedback::empty(), + agg_standby_feedback: StandbyFeedback::empty(), agg_ps_feedback: PageserverFeedback::empty(), slots: Vec::new(), } @@ -233,8 +242,9 @@ impl WalSendersShared { /// Update aggregated hot standy feedback. We just take min of valid xmins /// and ts. - fn update_hs_feedback(&mut self) { + fn update_reply_feedback(&mut self) { let mut agg = HotStandbyFeedback::empty(); + let mut reply_agg = StandbyReply::empty(); for ws_state in self.slots.iter().flatten() { if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback { let hs_feedback = standby_feedback.hs_feedback; @@ -257,9 +267,41 @@ impl WalSendersShared { } agg.ts = min(agg.ts, hs_feedback.ts); } + let reply = standby_feedback.reply; + if reply.write_lsn != Lsn::INVALID { + if reply_agg.write_lsn != Lsn::INVALID { + reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn); + } else { + reply_agg.write_lsn = reply.write_lsn; + } + } + if reply.flush_lsn != Lsn::INVALID { + if reply_agg.flush_lsn != Lsn::INVALID { + reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn); + } else { + reply_agg.flush_lsn = reply.flush_lsn; + } + } + if reply.apply_lsn != Lsn::INVALID { + if reply_agg.apply_lsn != Lsn::INVALID { + reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn); + } else { + reply_agg.apply_lsn = reply.apply_lsn; + } + } + if reply.reply_ts != 0 { + if reply_agg.reply_ts != 0 { + reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts); + } else { + reply_agg.reply_ts = reply.reply_ts; + } + } } } - self.agg_hs_feedback = agg; + self.agg_standby_feedback = StandbyFeedback { + reply: reply_agg, + hs_feedback: agg, + }; } /// Update aggregated pageserver feedback. LSNs (last_received, @@ -764,8 +806,11 @@ mod tests { fn test_hs_feedback_no_valid() { let mut wss = WalSendersShared::new(); push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID)); - wss.update_hs_feedback(); - assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID); + wss.update_reply_feedback(); + assert_eq!( + wss.agg_standby_feedback.hs_feedback.xmin, + INVALID_FULL_TRANSACTION_ID + ); } #[test] @@ -774,8 +819,8 @@ mod tests { push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID)); push_feedback(&mut wss, hs_feedback(1, 42)); push_feedback(&mut wss, hs_feedback(1, 64)); - wss.update_hs_feedback(); - assert_eq!(wss.agg_hs_feedback.xmin, 42); + wss.update_reply_feedback(); + assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42); } // form pageserver feedback with given last_record_lsn / tli size and the diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index 82f7954051..749a815d80 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -124,6 +124,7 @@ pub struct TimelineMemState { pub remote_consistent_lsn: Lsn, #[serde(with = "hex")] pub proposer_uuid: PgUuid, + pub standby_flush_lsn: Lsn, } /// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs @@ -148,6 +149,7 @@ where peer_horizon_lsn: state.peer_horizon_lsn, remote_consistent_lsn: state.remote_consistent_lsn, proposer_uuid: state.proposer_uuid, + standby_flush_lsn: Lsn::INVALID, }, pers: state, } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 730a80a583..44f45fdc36 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -270,6 +270,7 @@ impl SharedState { backup_lsn: self.sk.state.inmem.backup_lsn.0, local_start_lsn: self.sk.state.local_start_lsn.0, availability_zone: conf.availability_zone.clone(), + standby_flush_lsn: self.sk.state.inmem.standby_flush_lsn.0, } } @@ -628,9 +629,10 @@ impl Timeline { // if this is AppendResponse, fill in proper pageserver and hot // standby feedback. if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { - let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks(); - resp.hs_feedback = hs_feedback; + let (ps_feedback, standby_feedback) = self.walsenders.get_feedbacks(); + resp.hs_feedback = standby_feedback.hs_feedback; resp.pageserver_feedback = ps_feedback; + shared_state.sk.state.inmem.standby_flush_lsn = standby_feedback.reply.flush_lsn; } commit_lsn = shared_state.sk.state.inmem.commit_lsn; diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index 7d1b63d23f..74c94c31a8 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -42,6 +42,7 @@ message SafekeeperTimelineInfo { uint64 remote_consistent_lsn = 7; uint64 peer_horizon_lsn = 8; uint64 local_start_lsn = 9; + uint64 standby_flush_lsn = 14; // A connection string to use for WAL receiving. string safekeeper_connstr = 10; // HTTP endpoint connection string diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index 4e5f8ed724..b54d9461e8 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -734,6 +734,7 @@ mod tests { http_connstr: "neon-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, + standby_flush_lsn: 0, }) }