From b2a3981eaded4a1c277068563b0f69c5d6f6f986 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 12 Apr 2023 13:26:07 +0400 Subject: [PATCH] Move tracking of walsenders out of Timeline. Refactors walsenders out of timeline.rs to makes it less convoluted into separate WalSenders with its own lock, but otherwise having the same structure. Tracking of in-memory remote_consistent_lsn is also moved there as it is mainly received from pageserver. State of walsender (feedback) is also restructured to be cleaner; now it is either PageserverFeedback or StandbyFeedback(StandbyReply, HotStandbyFeedback), but not both. --- libs/pq_proto/src/lib.rs | 5 +- safekeeper/src/broker.rs | 2 +- safekeeper/src/debug_dump.rs | 4 +- safekeeper/src/http/routes.rs | 4 +- safekeeper/src/metrics.rs | 38 +- safekeeper/src/safekeeper.rs | 22 +- safekeeper/src/send_wal.rs | 472 ++++++++++++++++++++--- safekeeper/src/timeline.rs | 218 +++-------- test_runner/fixtures/neon_fixtures.py | 2 + test_runner/regress/test_wal_acceptor.py | 11 +- 10 files changed, 514 insertions(+), 264 deletions(-) diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index ed0239072a..1e7afa9bc0 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -947,9 +947,10 @@ impl<'a> BeMessage<'a> { pub struct PageserverFeedback { /// Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, - /// LSN last received and ingested by the pageserver. + /// LSN last received and ingested by the pageserver. Controls backpressure. pub last_received_lsn: u64, /// LSN up to which data is persisted by the pageserver to its local disc. + /// Controls backpressure. pub disk_consistent_lsn: u64, /// LSN up to which data is persisted by the pageserver on s3; safekeepers /// consider WAL before it can be removed. @@ -968,7 +969,7 @@ impl PageserverFeedback { last_received_lsn: 0, remote_consistent_lsn: 0, disk_consistent_lsn: 0, - replytime: SystemTime::now(), + replytime: *PG_EPOCH, } } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 92f35bf51f..6a98d8fd84 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -91,7 +91,7 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { // connection to the broker. // note: there are blocking operations below, but it's considered fine for now - tli.record_safekeeper_info(&msg).await? + tli.record_safekeeper_info(msg).await? } } bail!("end of stream"); diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index 674cf9f6eb..954fbfc438 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -22,7 +22,7 @@ use crate::safekeeper::SafekeeperMemState; use crate::safekeeper::TermHistory; use crate::SafeKeeperConf; -use crate::timeline::ReplicaState; +use crate::send_wal::WalSenderState; use crate::GlobalTimelines; /// Various filters that influence the resulting JSON output. @@ -87,7 +87,7 @@ pub struct Timeline { pub struct Memory { pub is_cancelled: bool, pub peers_info_len: usize, - pub replicas: Vec>, + pub walsenders: Vec, pub wal_backup_active: bool, pub active: bool, pub num_computes: u32, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index cdec45c148..ef691c5fe6 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -144,7 +144,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result Result<()>) -> Result { /// Metrics for a single timeline. pub struct FullTimelineInfo { pub ttid: TenantTimelineId, - pub replicas: Vec, + pub ps_feedback: PageserverFeedback, pub wal_backup_active: bool, pub timeline_is_active: bool, pub num_computes: u32, @@ -242,6 +242,7 @@ pub struct FullTimelineInfo { pub persisted_state: SafeKeeperState, pub flush_lsn: Lsn, + pub remote_consistent_lsn: Lsn, pub wal_storage: WalStorageMetrics, } @@ -514,19 +515,6 @@ impl Collector for TimelineCollector { let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; - let mut most_advanced: Option = None; - for replica in tli.replicas.iter() { - if let Some(replica_feedback) = replica.pageserver_feedback { - if let Some(current) = most_advanced { - if current.last_received_lsn < replica_feedback.last_received_lsn { - most_advanced = Some(replica_feedback); - } - } else { - most_advanced = Some(replica_feedback); - } - } - } - self.commit_lsn .with_label_values(labels) .set(tli.mem_state.commit_lsn.into()); @@ -544,7 +532,7 @@ impl Collector for TimelineCollector { .set(tli.mem_state.peer_horizon_lsn.into()); self.remote_consistent_lsn .with_label_values(labels) - .set(tli.mem_state.remote_consistent_lsn.into()); + .set(tli.remote_consistent_lsn.into()); self.timeline_active .with_label_values(labels) .set(tli.timeline_is_active as u64); @@ -567,15 +555,17 @@ impl Collector for TimelineCollector { .with_label_values(labels) .set(tli.wal_storage.flush_wal_seconds); - if let Some(feedback) = most_advanced { - self.ps_last_received_lsn + self.ps_last_received_lsn + .with_label_values(labels) + .set(tli.ps_feedback.last_received_lsn); + if let Ok(unix_time) = tli + .ps_feedback + .replytime + .duration_since(SystemTime::UNIX_EPOCH) + { + self.feedback_last_time_seconds .with_label_values(labels) - .set(feedback.last_received_lsn); - if let Ok(unix_time) = feedback.replytime.duration_since(SystemTime::UNIX_EPOCH) { - self.feedback_last_time_seconds - .with_label_values(labels) - .set(unix_time.as_secs()); - } + .set(unix_time.as_secs()); } if tli.last_removed_segno != 0 { diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 10b4842cbd..6864a9713d 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -212,7 +212,6 @@ pub struct SafekeeperMemState { pub commit_lsn: Lsn, pub backup_lsn: Lsn, pub peer_horizon_lsn: Lsn, - pub remote_consistent_lsn: Lsn, #[serde(with = "hex")] pub proposer_uuid: PgUuid, } @@ -540,7 +539,6 @@ where commit_lsn: state.commit_lsn, backup_lsn: state.backup_lsn, peer_horizon_lsn: state.peer_horizon_lsn, - remote_consistent_lsn: state.remote_consistent_lsn, proposer_uuid: state.proposer_uuid, }, state, @@ -781,10 +779,6 @@ where // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment. self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); - // Initializing remote_consistent_lsn sets that we have nothing to - // stream to pageserver(s) immediately after creation. - self.inmem.remote_consistent_lsn = - max(self.inmem.remote_consistent_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); self.persist_control_file(state)?; @@ -837,7 +831,6 @@ where state.commit_lsn = self.inmem.commit_lsn; state.backup_lsn = self.inmem.backup_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; - state.remote_consistent_lsn = self.inmem.remote_consistent_lsn; state.proposer_uuid = self.inmem.proposer_uuid; self.state.persist(&state) } @@ -940,14 +933,12 @@ where self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn; self.inmem.backup_lsn = new_backup_lsn; - let new_remote_consistent_lsn = max( - Lsn(sk_info.remote_consistent_lsn), - self.inmem.remote_consistent_lsn, - ); + // value in sk_info should be maximized over our local in memory value. + let new_remote_consistent_lsn = Lsn(sk_info.remote_consistent_lsn); + assert!(self.state.remote_consistent_lsn <= new_remote_consistent_lsn); sync_control_file |= self.state.remote_consistent_lsn + (self.state.server.wal_seg_size as u64) < new_remote_consistent_lsn; - self.inmem.remote_consistent_lsn = new_remote_consistent_lsn; let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn); sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) @@ -955,7 +946,12 @@ where self.inmem.peer_horizon_lsn = new_peer_horizon_lsn; if sync_control_file { - self.persist_control_file(self.state.clone())?; + let mut state = self.state.clone(); + // Note: we do not persist remote_consistent_lsn in other paths of + // persisting cf -- that is not much needed currently. We could do + // that by storing Arc to walsenders in Safekeeper. + state.remote_consistent_lsn = new_remote_consistent_lsn; + self.persist_control_file(state)?; } Ok(()) } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index a6ca89efa4..abd213deff 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -1,12 +1,14 @@ //! This module implements the streaming side of replication protocol, starting -//! with the "START_REPLICATION" message. +//! with the "START_REPLICATION" message, and registry of walsenders. use crate::handler::SafekeeperPostgresHandler; -use crate::timeline::{ReplicaState, Timeline}; +use crate::timeline::Timeline; +use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use crate::GlobalTimelines; use anyhow::Context as AnyhowContext; use bytes::Bytes; +use parking_lot::Mutex; use postgres_backend::PostgresBackend; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; @@ -14,8 +16,12 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, PageserverFeedback, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; +use utils::http::json::display_serialize; +use utils::id::TenantTimelineId; +use utils::lsn::AtomicLsn; -use std::cmp::min; +use std::cmp::{max, min}; +use std::net::SocketAddr; use std::str; use std::sync::Arc; use std::time::Duration; @@ -40,6 +46,8 @@ pub struct HotStandbyFeedback { pub catalog_xmin: FullTransactionId, } +const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0; + impl HotStandbyFeedback { pub fn empty() -> HotStandbyFeedback { HotStandbyFeedback { @@ -51,24 +59,293 @@ impl HotStandbyFeedback { } /// Standby status update -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct StandbyReply { - pub write_lsn: Lsn, // last lsn received by pageserver - pub flush_lsn: Lsn, // pageserver's disk consistent lSN - pub apply_lsn: Lsn, // pageserver's remote consistent lSN - pub reply_ts: TimestampTz, + pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. + pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. + pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. + pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. pub reply_requested: bool, } -/// Scope guard to unregister replication connection from timeline -struct ReplicationConnGuard { - replica: usize, // replica internal ID assigned by timeline - timeline: Arc, +impl StandbyReply { + fn empty() -> Self { + StandbyReply { + write_lsn: Lsn::INVALID, + flush_lsn: Lsn::INVALID, + apply_lsn: Lsn::INVALID, + reply_ts: 0, + reply_requested: false, + } + } } -impl Drop for ReplicationConnGuard { +#[derive(Debug, Clone, Copy, Serialize)] +pub struct StandbyFeedback { + reply: StandbyReply, + hs_feedback: HotStandbyFeedback, +} + +/// WalSenders registry. Timeline holds it (wrapped in Arc). +pub struct WalSenders { + /// Lsn maximized over all walsenders *and* peer data, so might be higher + /// than what we receive from replicas. + remote_consistent_lsn: AtomicLsn, + mutex: Mutex, +} + +impl WalSenders { + pub fn new(remote_consistent_lsn: Lsn) -> Arc { + Arc::new(WalSenders { + remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn), + mutex: Mutex::new(WalSendersShared::new()), + }) + } + + /// Register new walsender. Returned guard provides access to the slot and + /// automatically deregisters in Drop. + fn register( + self: &Arc, + ttid: TenantTimelineId, + addr: SocketAddr, + conn_id: ConnectionId, + appname: Option, + ) -> WalSenderGuard { + let slots = &mut self.mutex.lock().slots; + let walsender_state = WalSenderState { + ttid, + addr, + conn_id, + appname, + feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()), + }; + // find empty slot or create new one + let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) { + slots[pos] = Some(walsender_state); + pos + } else { + let pos = slots.len(); + slots.push(Some(walsender_state)); + pos + }; + WalSenderGuard { + id: pos, + walsenders: self.clone(), + } + } + + /// Get state of all walsenders. + pub fn get_all(self: &Arc) -> Vec { + self.mutex.lock().slots.iter().flatten().cloned().collect() + } + + /// Get aggregated pageserver feedback. + pub fn get_ps_feedback(self: &Arc) -> PageserverFeedback { + self.mutex.lock().agg_ps_feedback + } + + /// Get aggregated pageserver and hot standby feedback (we send them to compute). + pub fn get_feedbacks(self: &Arc) -> (PageserverFeedback, HotStandbyFeedback) { + let shared = self.mutex.lock(); + (shared.agg_ps_feedback, shared.agg_hs_feedback) + } + + /// Record new pageserver feedback, update aggregated values. + fn record_ps_feedback(self: &Arc, id: WalSenderId, feedback: &PageserverFeedback) { + let mut shared = self.mutex.lock(); + shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback); + shared.update_ps_feedback(); + self.update_remote_consistent_lsn(Lsn(shared.agg_ps_feedback.remote_consistent_lsn)); + } + + /// Record standby reply. + fn record_standby_reply(self: &Arc, id: WalSenderId, reply: &StandbyReply) { + let mut shared = self.mutex.lock(); + let slot = shared.get_slot_mut(id); + match &mut slot.feedback { + ReplicationFeedback::Standby(sf) => sf.reply = *reply, + ReplicationFeedback::Pageserver(_) => { + slot.feedback = ReplicationFeedback::Standby(StandbyFeedback { + reply: *reply, + hs_feedback: HotStandbyFeedback::empty(), + }) + } + } + } + + /// Record hot standby feedback, update aggregated value. + fn record_hs_feedback(self: &Arc, id: WalSenderId, feedback: &HotStandbyFeedback) { + let mut shared = self.mutex.lock(); + let slot = shared.get_slot_mut(id); + match &mut slot.feedback { + ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback, + ReplicationFeedback::Pageserver(_) => { + slot.feedback = ReplicationFeedback::Standby(StandbyFeedback { + reply: StandbyReply::empty(), + hs_feedback: *feedback, + }) + } + } + shared.update_hs_feedback(); + } + + /// Get remote_consistent_lsn reported by the pageserver. Returns None if + /// client is not pageserver. + fn get_ws_remote_consistent_lsn(self: &Arc, id: WalSenderId) -> Option { + let shared = self.mutex.lock(); + let slot = shared.get_slot(id); + match slot.feedback { + ReplicationFeedback::Pageserver(feedback) => Some(Lsn(feedback.remote_consistent_lsn)), + _ => None, + } + } + + /// Get remote_consistent_lsn maximized across all walsenders and peers. + pub fn get_remote_consistent_lsn(self: &Arc) -> Lsn { + self.remote_consistent_lsn.load() + } + + /// Update maximized remote_consistent_lsn, return new (potentially) value. + pub fn update_remote_consistent_lsn(self: &Arc, candidate: Lsn) -> Lsn { + self.remote_consistent_lsn + .fetch_max(candidate) + .max(candidate) + } + + /// Unregister walsender. + fn unregister(self: &Arc, id: WalSenderId) { + let mut shared = self.mutex.lock(); + shared.slots[id] = None; + shared.update_hs_feedback(); + } +} + +struct WalSendersShared { + // aggregated over all walsenders value + agg_hs_feedback: HotStandbyFeedback, + // aggregated over all walsenders value + agg_ps_feedback: PageserverFeedback, + slots: Vec>, +} + +impl WalSendersShared { + fn new() -> Self { + WalSendersShared { + agg_hs_feedback: HotStandbyFeedback::empty(), + agg_ps_feedback: PageserverFeedback::empty(), + slots: Vec::new(), + } + } + + /// Get content of provided id slot, it must exist. + fn get_slot(&self, id: WalSenderId) -> &WalSenderState { + self.slots[id].as_ref().expect("walsender doesn't exist") + } + + /// Get mut content of provided id slot, it must exist. + fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState { + self.slots[id].as_mut().expect("walsender doesn't exist") + } + + /// Update aggregated hot standy feedback. We just take min of valid xmins + /// and ts. + fn update_hs_feedback(&mut self) { + let mut agg = HotStandbyFeedback::empty(); + for ws_state in self.slots.iter().flatten() { + if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback { + let hs_feedback = standby_feedback.hs_feedback; + // doing Option math like op1.iter().chain(op2.iter()).min() + // would be nicer, but we serialize/deserialize this struct + // directly, so leave as is for now + if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID { + if agg.xmin != INVALID_FULL_TRANSACTION_ID { + agg.xmin = min(agg.xmin, hs_feedback.xmin); + } else { + agg.xmin = hs_feedback.xmin; + } + agg.ts = min(agg.ts, hs_feedback.ts); + } + if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID { + if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID { + agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin); + } else { + agg.catalog_xmin = hs_feedback.catalog_xmin; + } + agg.ts = min(agg.ts, hs_feedback.ts); + } + } + } + self.agg_hs_feedback = agg; + } + + /// Update aggregated pageserver feedback. LSNs (last_received, + /// disk_consistent, remote_consistent) and reply timestamp are just + /// maximized; timeline_size if taken from feedback with highest + /// last_received lsn. This is generally reasonable, but we might want to + /// implement other policies once multiple pageservers start to be actively + /// used. + fn update_ps_feedback(&mut self) { + let init = PageserverFeedback::empty(); + let acc = + self.slots + .iter() + .flatten() + .fold(init, |mut acc, ws_state| match ws_state.feedback { + ReplicationFeedback::Pageserver(feedback) => { + if feedback.last_received_lsn > acc.last_received_lsn { + acc.current_timeline_size = feedback.current_timeline_size; + } + acc.last_received_lsn = + max(feedback.last_received_lsn, acc.last_received_lsn); + acc.disk_consistent_lsn = + max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn); + acc.remote_consistent_lsn = + max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn); + acc.replytime = max(feedback.replytime, acc.replytime); + acc + } + ReplicationFeedback::Standby(_) => acc, + }); + self.agg_ps_feedback = acc; + } +} + +// Serialized is used only for pretty printing in json. +#[derive(Debug, Clone, Serialize)] +pub struct WalSenderState { + #[serde(serialize_with = "display_serialize")] + ttid: TenantTimelineId, + addr: SocketAddr, + conn_id: ConnectionId, + // postgres application_name + appname: Option, + feedback: ReplicationFeedback, +} + +// Receiver is either pageserver or regular standby, which have different +// feedbacks. +#[derive(Debug, Clone, Copy, Serialize)] +enum ReplicationFeedback { + Pageserver(PageserverFeedback), + Standby(StandbyFeedback), +} + +// id of the occupied slot in WalSenders to access it (and save in the +// WalSenderGuard). We could give Arc directly to the slot, but there is not +// much sense in that as values aggregation which is performed on each feedback +// receival iterates over all walsenders. +pub type WalSenderId = usize; + +/// Scope guard to access slot in WalSenders registry and unregister from it in +/// Drop. +pub struct WalSenderGuard { + id: WalSenderId, + walsenders: Arc, +} + +impl Drop for WalSenderGuard { fn drop(&mut self) { - self.timeline.remove_replica(self.replica); + self.walsenders.unregister(self.id); } } @@ -97,16 +374,13 @@ impl SafekeeperPostgresHandler { let tli = GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?; - let state = ReplicaState::new(); - // This replica_id is used below to check if it's time to stop replication. - let replica_id = tli.add_replica(state); - - // Use a guard object to remove our entry from the timeline, when the background - // thread and us have both finished using it. - let _guard = Arc::new(ReplicationConnGuard { - replica: replica_id, - timeline: tli.clone(), - }); + // Use a guard object to remove our entry from the timeline when we are done. + let ws_guard = Arc::new(tli.get_walsenders().register( + self.ttid, + *pgb.get_peer_addr(), + self.conn_id, + self.appname.clone(), + )); // Walproposer gets special handling: safekeeper must give proposer all // local WAL till the end, whether committed or not (walproposer will @@ -154,16 +428,11 @@ impl SafekeeperPostgresHandler { end_pos, stop_pos, commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), - replica_id, + ws_guard: ws_guard.clone(), wal_reader, send_buf: [0; MAX_SEND_SIZE], }; - let mut reply_reader = ReplyReader { - reader, - tli, - replica_id, - feedback: ReplicaState::new(), - }; + let mut reply_reader = ReplyReader { reader, ws_guard }; let res = tokio::select! { // todo: add read|write .context to these errors @@ -190,7 +459,7 @@ struct WalSender<'a, IO> { // in recovery. stop_pos: Option, commit_lsn_watch_rx: Receiver, - replica_id: usize, + ws_guard: Arc, wal_reader: WalReader, // buffer for readling WAL into to send it send_buf: [u8; MAX_SEND_SIZE], @@ -264,14 +533,20 @@ impl WalSender<'_, IO> { return Ok(()); } // Timed out waiting for WAL, check for termination and send KA - if self.tli.should_walsender_stop(self.replica_id) { - // Terminate if there is nothing more to send. - // TODO close the stream properly - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( - "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", - self.appname, self.start_pos, - ))); + if let Some(remote_consistent_lsn) = self + .ws_guard + .walsenders + .get_ws_remote_consistent_lsn(self.ws_guard.id) + { + if self.tli.should_walsender_stop(remote_consistent_lsn) { + // Terminate if there is nothing more to send. + return Err(CopyStreamHandlerEnd::ServerInitiated(format!( + "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", + self.appname, self.start_pos, + ))); + } } + self.pgb .write_message(&BeMessage::KeepAlive(WalSndKeepAlive { sent_ptr: self.end_pos.0, @@ -286,9 +561,7 @@ impl WalSender<'_, IO> { /// A half driving receiving replies. struct ReplyReader { reader: PostgresBackendReader, - tli: Arc, - replica_id: usize, - feedback: ReplicaState, + ws_guard: Arc, } impl ReplyReader { @@ -303,29 +576,32 @@ impl ReplyReader { match msg.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. - self.feedback.hs_feedback = HotStandbyFeedback::des(&msg[1..]) + let hs_feedback = HotStandbyFeedback::des(&msg[1..]) .context("failed to deserialize HotStandbyFeedback")?; - self.tli - .update_replica_state(self.replica_id, self.feedback); + self.ws_guard + .walsenders + .record_hs_feedback(self.ws_guard.id, &hs_feedback); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { - let _reply = + let reply = StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?; - // This must be a regular postgres replica, - // because pageserver doesn't send this type of messages to safekeeper. - // Currently we just ignore this, tracking progress for them is not supported. + self.ws_guard + .walsenders + .record_standby_reply(self.ws_guard.id, &reply); } Some(NEON_STATUS_UPDATE_TAG_BYTE) => { // pageserver sends this. // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. let buf = Bytes::copy_from_slice(&msg[9..]); - let reply = PageserverFeedback::parse(buf); + let ps_feedback = PageserverFeedback::parse(buf); - trace!("PageserverFeedback is {:?}", reply); - self.feedback.pageserver_feedback = Some(reply); - - self.tli - .update_replica_state(self.replica_id, self.feedback); + trace!("PageserverFeedback is {:?}", ps_feedback); + self.ws_guard + .walsenders + .record_ps_feedback(self.ws_guard.id, &ps_feedback); + // in principle new remote_consistent_lsn could allow to + // deactivate the timeline, but we check that regularly through + // broker updated, not need to do it here } _ => warn!("unexpected message {:?}", msg), } @@ -368,3 +644,89 @@ async fn wait_for_lsn(rx: &mut Receiver, lsn: Lsn) -> anyhow::Result