This commit is contained in:
Christian Schwarz
2025-06-30 21:16:03 +02:00
parent 18cd307461
commit 66ab1047ef
3 changed files with 43 additions and 123 deletions

View File

@@ -96,50 +96,13 @@ impl HotStandbyFeedback {
}
}
/// Standby status update
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
pub reply_requested: bool,
}
impl StandbyReply {
pub fn empty() -> Self {
StandbyReply {
write_lsn: Lsn::INVALID,
flush_lsn: Lsn::INVALID,
apply_lsn: Lsn::INVALID,
reply_ts: 0,
reply_requested: false,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}
impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// Receiver is either pageserver or regular standby, which have different
/// feedbacks.
/// Used as both model and internally.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ReplicationFeedback {
Pageserver(PageserverFeedback),
Standby(StandbyFeedback),
Standby(HotStandbyFeedback),
}
/// Uniquely identifies a WAL service connection. Logged in spans for

View File

@@ -16,8 +16,7 @@ use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, TimestampTz, get_current_times
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use safekeeper_api::Term;
use safekeeper_api::models::{
HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback, StandbyFeedback,
StandbyReply,
HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::watch::Receiver;
@@ -198,8 +197,8 @@ impl WalSenders {
}
/// Get aggregated hot standby feedback (we send it to compute).
pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
self.mutex.lock().agg_standby_feedback
pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
self.mutex.lock().agg_hs_feedback
}
/// Record new pageserver feedback, update aggregated values.
@@ -216,36 +215,14 @@ impl WalSenders {
self.walreceivers.broadcast_pageserver_feedback(*feedback);
}
/// Record standby reply.
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
debug!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
match &mut slot.get_mut_feedback() {
ReplicationFeedback::Standby(sf) => sf.reply = *reply,
ReplicationFeedback::Pageserver(_) => {
*slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
reply: *reply,
hs_feedback: HotStandbyFeedback::empty(),
})
}
}
}
/// Record hot standby feedback, update aggregated value.
fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
match &mut slot.get_mut_feedback() {
ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
ReplicationFeedback::Standby(sf) => *sf = *feedback,
ReplicationFeedback::Pageserver(_) => {
*slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: *feedback,
})
*slot.get_mut_feedback() = ReplicationFeedback::Standby(*feedback);
}
}
shared.update_reply_feedback();
@@ -272,7 +249,7 @@ impl WalSenders {
struct WalSendersShared {
// aggregated over all walsenders value
agg_standby_feedback: StandbyFeedback,
agg_hs_feedback: HotStandbyFeedback,
// last feedback ever received from any pageserver, empty if none
last_ps_feedback: PageserverFeedback,
// total counter of pageserver feedbacks received
@@ -324,7 +301,7 @@ impl WalSenderState {
impl WalSendersShared {
fn new() -> Self {
WalSendersShared {
agg_standby_feedback: StandbyFeedback::empty(),
agg_hs_feedback: HotStandbyFeedback::empty(),
last_ps_feedback: PageserverFeedback::empty(),
ps_feedback_counter: 0,
slots: Vec::new(),
@@ -341,14 +318,12 @@ impl WalSendersShared {
self.slots[id].as_mut().expect("walsender doesn't exist")
}
/// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
/// Update aggregated hot standy feedbacks. We just take min of valid xmins
/// and ts.
fn update_reply_feedback(&mut self) {
let mut agg = HotStandbyFeedback::empty();
let mut reply_agg = StandbyReply::empty();
for ws_state in self.slots.iter().flatten() {
if let ReplicationFeedback::Standby(standby_feedback) = ws_state.get_feedback() {
let hs_feedback = standby_feedback.hs_feedback;
if let ReplicationFeedback::Standby(hs_feedback) = ws_state.get_feedback() {
// doing Option math like op1.iter().chain(op2.iter()).min()
// would be nicer, but we serialize/deserialize this struct
// directly, so leave as is for now
@@ -368,41 +343,9 @@ impl WalSendersShared {
}
agg.ts = max(agg.ts, hs_feedback.ts);
}
let reply = standby_feedback.reply;
if reply.write_lsn != Lsn::INVALID {
if reply_agg.write_lsn != Lsn::INVALID {
reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
} else {
reply_agg.write_lsn = reply.write_lsn;
}
}
if reply.flush_lsn != Lsn::INVALID {
if reply_agg.flush_lsn != Lsn::INVALID {
reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
} else {
reply_agg.flush_lsn = reply.flush_lsn;
}
}
if reply.apply_lsn != Lsn::INVALID {
if reply_agg.apply_lsn != Lsn::INVALID {
reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
} else {
reply_agg.apply_lsn = reply.apply_lsn;
}
}
if reply.reply_ts != 0 {
if reply_agg.reply_ts != 0 {
reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
} else {
reply_agg.reply_ts = reply.reply_ts;
}
}
}
}
self.agg_standby_feedback = StandbyFeedback {
reply: reply_agg,
hs_feedback: agg,
};
self.agg_hs_feedback = agg;
}
}
@@ -1006,11 +949,30 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
.record_hs_feedback(self.ws_guard.id, &hs_feedback);
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply =
StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
self.ws_guard
.walsenders
.record_standby_reply(self.ws_guard.id, &reply);
// Earlier version sof the software did things with the standby reply.
// Current code is only interested in the hot standby data.
// For posterity, and potential future use, still maintain the code to parse it.
if cfg!(feature = "testing") {
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
struct StandbyReply {
write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
reply_requested: bool,
}
match StandbyReply::des(&msg[1..]) {
Ok(reply) => {
debug!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
}
Err(e) => {
debug!("error deserializing standby reply: {e:?}");
}
}
}
}
Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
// pageserver sends this.
@@ -1037,6 +999,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
#[cfg(test)]
mod tests {
use postgres_ffi::TimestampTz;
use safekeeper_api::models::FullTransactionId;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
@@ -1068,13 +1031,10 @@ mod tests {
// form standby feedback with given hot standby feedback ts/xmin and the
// rest set to dummy values.
fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
ReplicationFeedback::Standby(StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback {
ts,
xmin,
catalog_xmin: 0,
},
ReplicationFeedback::Standby(HotStandbyFeedback {
ts,
xmin,
catalog_xmin: 0,
})
}
@@ -1084,10 +1044,7 @@ mod tests {
let mut wss = WalSendersShared::new();
push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
wss.update_reply_feedback();
assert_eq!(
wss.agg_standby_feedback.hs_feedback.xmin,
INVALID_FULL_TRANSACTION_ID
);
assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
}
#[test]
@@ -1097,6 +1054,6 @@ mod tests {
push_feedback(&mut wss, hs_feedback(1, 42));
push_feedback(&mut wss, hs_feedback(1, 64));
wss.update_reply_feedback();
assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
assert_eq!(wss.agg_hs_feedback.xmin, 42);
}
}

View File

@@ -1056,7 +1056,7 @@ impl WalResidentTimeline {
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
resp.hs_feedback = self.walsenders.get_hotstandby();
}
}
Ok(rmsg)