From 66ab1047ef1a79db2b8385c94a67b47640c0e18d Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 30 Jun 2025 21:16:03 +0200 Subject: [PATCH] undo some more of the changes from https://github.com/neondatabase/neon/pull/7368/files --- libs/safekeeper_api/src/models.rs | 39 +--------- safekeeper/src/send_wal.rs | 125 ++++++++++-------------------- safekeeper/src/timeline.rs | 2 +- 3 files changed, 43 insertions(+), 123 deletions(-) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 75760301bf..0c5a5c945f 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -96,50 +96,13 @@ impl HotStandbyFeedback { } } -/// Standby status update -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct StandbyReply { - pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. - pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. - pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. - pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. - pub reply_requested: bool, -} - -impl StandbyReply { - pub fn empty() -> Self { - StandbyReply { - write_lsn: Lsn::INVALID, - flush_lsn: Lsn::INVALID, - apply_lsn: Lsn::INVALID, - reply_ts: 0, - reply_requested: false, - } - } -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct StandbyFeedback { - pub reply: StandbyReply, - pub hs_feedback: HotStandbyFeedback, -} - -impl StandbyFeedback { - pub fn empty() -> Self { - StandbyFeedback { - reply: StandbyReply::empty(), - hs_feedback: HotStandbyFeedback::empty(), - } - } -} - /// Receiver is either pageserver or regular standby, which have different /// feedbacks. /// Used as both model and internally. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum ReplicationFeedback { Pageserver(PageserverFeedback), - Standby(StandbyFeedback), + Standby(HotStandbyFeedback), } /// Uniquely identifies a WAL service connection. Logged in spans for diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 177e759db5..28eabeeae8 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -16,8 +16,7 @@ use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_times use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; use safekeeper_api::Term; use safekeeper_api::models::{ - HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback, StandbyFeedback, - StandbyReply, + HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback, }; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::watch::Receiver; @@ -198,8 +197,8 @@ impl WalSenders { } /// Get aggregated hot standby feedback (we send it to compute). - pub fn get_hotstandby(self: &Arc) -> StandbyFeedback { - self.mutex.lock().agg_standby_feedback + pub fn get_hotstandby(self: &Arc) -> HotStandbyFeedback { + self.mutex.lock().agg_hs_feedback } /// Record new pageserver feedback, update aggregated values. @@ -216,36 +215,14 @@ impl WalSenders { self.walreceivers.broadcast_pageserver_feedback(*feedback); } - /// Record standby reply. - fn record_standby_reply(self: &Arc, id: WalSenderId, reply: &StandbyReply) { - let mut shared = self.mutex.lock(); - let slot = shared.get_slot_mut(id); - debug!( - "Record standby reply: ts={} apply_lsn={}", - reply.reply_ts, reply.apply_lsn - ); - match &mut slot.get_mut_feedback() { - ReplicationFeedback::Standby(sf) => sf.reply = *reply, - ReplicationFeedback::Pageserver(_) => { - *slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback { - reply: *reply, - hs_feedback: HotStandbyFeedback::empty(), - }) - } - } - } - /// Record hot standby feedback, update aggregated value. fn record_hs_feedback(self: &Arc, id: WalSenderId, feedback: &HotStandbyFeedback) { let mut shared = self.mutex.lock(); let slot = shared.get_slot_mut(id); match &mut slot.get_mut_feedback() { - ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback, + ReplicationFeedback::Standby(sf) => *sf = *feedback, ReplicationFeedback::Pageserver(_) => { - *slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback { - reply: StandbyReply::empty(), - hs_feedback: *feedback, - }) + *slot.get_mut_feedback() = ReplicationFeedback::Standby(*feedback); } } shared.update_reply_feedback(); @@ -272,7 +249,7 @@ impl WalSenders { struct WalSendersShared { // aggregated over all walsenders value - agg_standby_feedback: StandbyFeedback, + agg_hs_feedback: HotStandbyFeedback, // last feedback ever received from any pageserver, empty if none last_ps_feedback: PageserverFeedback, // total counter of pageserver feedbacks received @@ -324,7 +301,7 @@ impl WalSenderState { impl WalSendersShared { fn new() -> Self { WalSendersShared { - agg_standby_feedback: StandbyFeedback::empty(), + agg_hs_feedback: HotStandbyFeedback::empty(), last_ps_feedback: PageserverFeedback::empty(), ps_feedback_counter: 0, slots: Vec::new(), @@ -341,14 +318,12 @@ impl WalSendersShared { self.slots[id].as_mut().expect("walsender doesn't exist") } - /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins + /// Update aggregated hot standy feedbacks. We just take min of valid xmins /// and ts. fn update_reply_feedback(&mut self) { let mut agg = HotStandbyFeedback::empty(); - let mut reply_agg = StandbyReply::empty(); for ws_state in self.slots.iter().flatten() { - if let ReplicationFeedback::Standby(standby_feedback) = ws_state.get_feedback() { - let hs_feedback = standby_feedback.hs_feedback; + if let ReplicationFeedback::Standby(hs_feedback) = ws_state.get_feedback() { // doing Option math like op1.iter().chain(op2.iter()).min() // would be nicer, but we serialize/deserialize this struct // directly, so leave as is for now @@ -368,41 +343,9 @@ impl WalSendersShared { } agg.ts = max(agg.ts, hs_feedback.ts); } - let reply = standby_feedback.reply; - if reply.write_lsn != Lsn::INVALID { - if reply_agg.write_lsn != Lsn::INVALID { - reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn); - } else { - reply_agg.write_lsn = reply.write_lsn; - } - } - if reply.flush_lsn != Lsn::INVALID { - if reply_agg.flush_lsn != Lsn::INVALID { - reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn); - } else { - reply_agg.flush_lsn = reply.flush_lsn; - } - } - if reply.apply_lsn != Lsn::INVALID { - if reply_agg.apply_lsn != Lsn::INVALID { - reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn); - } else { - reply_agg.apply_lsn = reply.apply_lsn; - } - } - if reply.reply_ts != 0 { - if reply_agg.reply_ts != 0 { - reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts); - } else { - reply_agg.reply_ts = reply.reply_ts; - } - } } } - self.agg_standby_feedback = StandbyFeedback { - reply: reply_agg, - hs_feedback: agg, - }; + self.agg_hs_feedback = agg; } } @@ -1006,11 +949,30 @@ impl ReplyReader { .record_hs_feedback(self.ws_guard.id, &hs_feedback); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { - let reply = - StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?; - self.ws_guard - .walsenders - .record_standby_reply(self.ws_guard.id, &reply); + // Earlier version sof the software did things with the standby reply. + // Current code is only interested in the hot standby data. + // For posterity, and potential future use, still maintain the code to parse it. + if cfg!(feature = "testing") { + #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] + struct StandbyReply { + write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. + flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. + apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. + reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. + reply_requested: bool, + } + match StandbyReply::des(&msg[1..]) { + Ok(reply) => { + debug!( + "Record standby reply: ts={} apply_lsn={}", + reply.reply_ts, reply.apply_lsn + ); + } + Err(e) => { + debug!("error deserializing standby reply: {e:?}"); + } + } + } } Some(NEON_STATUS_UPDATE_TAG_BYTE) => { // pageserver sends this. @@ -1037,6 +999,7 @@ impl ReplyReader { #[cfg(test)] mod tests { + use postgres_ffi::TimestampTz; use safekeeper_api::models::FullTransactionId; use utils::id::{TenantId, TenantTimelineId, TimelineId}; @@ -1068,13 +1031,10 @@ mod tests { // form standby feedback with given hot standby feedback ts/xmin and the // rest set to dummy values. fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback { - ReplicationFeedback::Standby(StandbyFeedback { - reply: StandbyReply::empty(), - hs_feedback: HotStandbyFeedback { - ts, - xmin, - catalog_xmin: 0, - }, + ReplicationFeedback::Standby(HotStandbyFeedback { + ts, + xmin, + catalog_xmin: 0, }) } @@ -1084,10 +1044,7 @@ mod tests { let mut wss = WalSendersShared::new(); push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID)); wss.update_reply_feedback(); - assert_eq!( - wss.agg_standby_feedback.hs_feedback.xmin, - INVALID_FULL_TRANSACTION_ID - ); + assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID); } #[test] @@ -1097,6 +1054,6 @@ mod tests { push_feedback(&mut wss, hs_feedback(1, 42)); push_feedback(&mut wss, hs_feedback(1, 64)); wss.update_reply_feedback(); - assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42); + assert_eq!(wss.agg_hs_feedback.xmin, 42); } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 989b47d185..37035c8759 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -1056,7 +1056,7 @@ impl WalResidentTimeline { // if this is AppendResponse, fill in proper hot standby feedback. if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { - resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback; + resp.hs_feedback = self.walsenders.get_hotstandby(); } } Ok(rmsg)