diff --git a/libs/utils/src/pageserver_feedback.rs b/libs/utils/src/pageserver_feedback.rs index c9fbdde928..bc8fa7362e 100644 --- a/libs/utils/src/pageserver_feedback.rs +++ b/libs/utils/src/pageserver_feedback.rs @@ -123,6 +123,12 @@ impl PageserverFeedback { rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64); } } + b"shard_number" => { + let len = buf.get_i32(); + // TODO: this will be implemented in the next update, + // for now, we just skip the value. + buf.advance(len as usize); + } _ => { let len = buf.get_i32(); warn!( diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index f12e079632..e541527b6a 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -140,6 +140,13 @@ pub static BROKER_ITERATION_TIMELINES: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec") }); +pub static RECEIVED_PS_FEEDBACKS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_received_ps_feedbacks_total", + "Number of pageserver feedbacks received" + ) + .expect("Failed to register safekeeper_received_ps_feedbacks_total counter") +}); pub const LABEL_UNKNOWN: &str = "unknown"; @@ -301,7 +308,8 @@ pub async fn time_io_closure>( #[derive(Clone)] pub struct FullTimelineInfo { pub ttid: TenantTimelineId, - pub ps_feedback: PageserverFeedback, + pub ps_feedback_count: u64, + pub last_ps_feedback: PageserverFeedback, pub wal_backup_active: bool, pub timeline_is_active: bool, pub num_computes: u32, @@ -327,6 +335,7 @@ pub struct TimelineCollector { remote_consistent_lsn: GenericGaugeVec, ps_last_received_lsn: GenericGaugeVec, feedback_last_time_seconds: GenericGaugeVec, + ps_feedback_count: GenericGaugeVec, timeline_active: GenericGaugeVec, wal_backup_active: GenericGaugeVec, connected_computes: IntGaugeVec, @@ -430,6 +439,15 @@ impl TimelineCollector { .unwrap(); descs.extend(feedback_last_time_seconds.desc().into_iter().cloned()); + let ps_feedback_count = GenericGaugeVec::new( + Opts::new( + "safekeeper_ps_feedback_count_total", + "Number of feedbacks received from the pageserver", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + let timeline_active = GenericGaugeVec::new( Opts::new( "safekeeper_timeline_active", @@ -538,6 +556,7 @@ impl TimelineCollector { remote_consistent_lsn, ps_last_received_lsn, feedback_last_time_seconds, + ps_feedback_count, timeline_active, wal_backup_active, connected_computes, @@ -570,6 +589,7 @@ impl Collector for TimelineCollector { self.remote_consistent_lsn.reset(); self.ps_last_received_lsn.reset(); self.feedback_last_time_seconds.reset(); + self.ps_feedback_count.reset(); self.timeline_active.reset(); self.wal_backup_active.reset(); self.connected_computes.reset(); @@ -646,9 +666,12 @@ impl Collector for TimelineCollector { self.ps_last_received_lsn .with_label_values(labels) - .set(tli.ps_feedback.last_received_lsn.0); + .set(tli.last_ps_feedback.last_received_lsn.0); + self.ps_feedback_count + .with_label_values(labels) + .set(tli.ps_feedback_count); if let Ok(unix_time) = tli - .ps_feedback + .last_ps_feedback .replytime .duration_since(SystemTime::UNIX_EPOCH) { @@ -679,6 +702,7 @@ impl Collector for TimelineCollector { mfs.extend(self.remote_consistent_lsn.collect()); mfs.extend(self.ps_last_received_lsn.collect()); mfs.extend(self.feedback_last_time_seconds.collect()); + mfs.extend(self.ps_feedback_count.collect()); mfs.extend(self.timeline_active.collect()); mfs.extend(self.wal_backup_active.collect()); mfs.extend(self.connected_computes.collect()); diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 9ce9b049ba..015b53bb2e 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -36,11 +36,15 @@ use tokio::time::Instant; use tracing::*; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::pageserver_feedback::PageserverFeedback; + +const DEFAULT_FEEDBACK_CAPACITY: usize = 8; /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped /// in Arc). pub struct WalReceivers { mutex: Mutex, + pageserver_feedback_tx: tokio::sync::broadcast::Sender, } /// Id under which walreceiver is registered in shmem. @@ -48,8 +52,12 @@ type WalReceiverId = usize; impl WalReceivers { pub fn new() -> Arc { + let (pageserver_feedback_tx, _) = + tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY); + Arc::new(WalReceivers { mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }), + pageserver_feedback_tx, }) } @@ -116,6 +124,12 @@ impl WalReceivers { let mut shared = self.mutex.lock(); shared.slots[id] = None; } + + /// Broadcast pageserver feedback to connected walproposers. + pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) { + // Err means there is no subscribers, it is fine. + let _ = self.pageserver_feedback_tx.send(feedback); + } } /// Only a few connections are expected (normally one), so store in Vec. @@ -197,17 +211,28 @@ impl SafekeeperPostgresHandler { // sends, so this avoids deadlocks. let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?; let peer_addr = *pgb.get_peer_addr(); - let network_reader = NetworkReader { + let mut network_reader = NetworkReader { ttid: self.ttid, conn_id: self.conn_id, pgb_reader: &mut pgb_reader, peer_addr, acceptor_handle: &mut acceptor_handle, }; - let res = tokio::select! { - // todo: add read|write .context to these errors - r = network_reader.run(msg_tx, msg_rx, reply_tx) => r, - r = network_write(pgb, reply_rx) => r, + + // Read first message and create timeline if needed. + let res = network_reader.read_first_message().await; + + let res = if let Ok((tli, next_msg)) = res { + let pageserver_feedback_rx: tokio::sync::broadcast::Receiver = + tli.get_walreceivers().pageserver_feedback_tx.subscribe(); + + tokio::select! { + // todo: add read|write .context to these errors + r = network_reader.run(msg_tx, msg_rx, reply_tx, tli.clone(), next_msg) => r, + r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r, + } + } else { + res.map(|_| ()) }; // Join pg backend back. @@ -251,12 +276,9 @@ struct NetworkReader<'a, IO> { } impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { - async fn run( - self, - msg_tx: Sender, - msg_rx: Receiver, - reply_tx: Sender, - ) -> Result<(), CopyStreamHandlerEnd> { + async fn read_first_message( + &mut self, + ) -> Result<(Arc, ProposerAcceptorMessage), CopyStreamHandlerEnd> { // Receive information about server to create timeline, if not yet. let next_msg = read_message(self.pgb_reader).await?; let tli = match next_msg { @@ -278,9 +300,19 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { ))) } }; + Ok((tli, next_msg)) + } + async fn run( + self, + msg_tx: Sender, + msg_rx: Receiver, + reply_tx: Sender, + tli: Arc, + next_msg: ProposerAcceptorMessage, + ) -> Result<(), CopyStreamHandlerEnd> { *self.acceptor_handle = Some(WalAcceptor::spawn( - tli.clone(), + tli, msg_rx, reply_tx, Some(self.conn_id), @@ -320,18 +352,46 @@ async fn read_network_loop( async fn network_write( pgb_writer: &mut PostgresBackend, mut reply_rx: Receiver, + mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver, ) -> Result<(), CopyStreamHandlerEnd> { let mut buf = BytesMut::with_capacity(128); + // storing append_response to inject PageserverFeedback into it + let mut last_append_response = None; + loop { - match reply_rx.recv().await { - Some(msg) => { - buf.clear(); - msg.serialize(&mut buf)?; - pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?; + // trying to read either AcceptorProposerMessage or PageserverFeedback + let msg = tokio::select! { + reply = reply_rx.recv() => { + if let Some(msg) = reply { + if let AcceptorProposerMessage::AppendResponse(append_response) = &msg { + last_append_response = Some(append_response.clone()); + } + Some(msg) + } else { + return Ok(()); // chan closed, WalAcceptor terminated + } } - None => return Ok(()), // chan closed, WalAcceptor terminated - } + + feedback = pageserver_feedback_rx.recv() => + match (feedback, &last_append_response) { + (Ok(feedback), Some(append_response)) => { + // clone AppendResponse and inject PageserverFeedback into it + let mut append_response = append_response.clone(); + append_response.pageserver_feedback = Some(feedback); + Some(AcceptorProposerMessage::AppendResponse(append_response)) + } + _ => None, + } + }; + + let Some(msg) = msg else { + continue; + }; + + buf.clear(); + msg.serialize(&mut buf)?; + pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?; } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 84393d8dab..d7c8fa6955 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -321,7 +321,7 @@ pub struct AppendRequestHeader { } /// Report safekeeper state to proposer -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Clone)] pub struct AppendResponse { // Current term of the safekeeper; if it is higher than proposer's, the // compute is out of date. @@ -334,7 +334,7 @@ pub struct AppendResponse { // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, - pub pageserver_feedback: PageserverFeedback, + pub pageserver_feedback: Option, } impl AppendResponse { @@ -344,7 +344,7 @@ impl AppendResponse { flush_lsn: Lsn(0), commit_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), - pageserver_feedback: PageserverFeedback::empty(), + pageserver_feedback: None, } } } @@ -462,7 +462,11 @@ impl AcceptorProposerMessage { buf.put_u64_le(msg.hs_feedback.xmin); buf.put_u64_le(msg.hs_feedback.catalog_xmin); - msg.pageserver_feedback.serialize(buf); + // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback + // if it is not present. + if let Some(ref msg) = msg.pageserver_feedback { + msg.serialize(buf); + } } } @@ -681,7 +685,7 @@ where commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), - pageserver_feedback: PageserverFeedback::empty(), + pageserver_feedback: None, }; trace!("formed AppendResponse {:?}", ar); ar diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 4b887f36b7..7da5fd00b0 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -2,6 +2,8 @@ //! with the "START_REPLICATION" message, and registry of walsenders. use crate::handler::SafekeeperPostgresHandler; +use crate::metrics::RECEIVED_PS_FEEDBACKS; +use crate::receive_wal::WalReceivers; use crate::safekeeper::{Term, TermLsn}; use crate::timeline::Timeline; use crate::wal_service::ConnectionId; @@ -21,7 +23,7 @@ use utils::failpoint_support; use utils::id::TenantTimelineId; use utils::pageserver_feedback::PageserverFeedback; -use std::cmp::{max, min}; +use std::cmp::min; use std::net::SocketAddr; use std::str; use std::sync::Arc; @@ -90,12 +92,14 @@ pub struct StandbyFeedback { /// WalSenders registry. Timeline holds it (wrapped in Arc). pub struct WalSenders { mutex: Mutex, + walreceivers: Arc, } impl WalSenders { - pub fn new() -> Arc { + pub fn new(walreceivers: Arc) -> Arc { Arc::new(WalSenders { mutex: Mutex::new(WalSendersShared::new()), + walreceivers, }) } @@ -151,22 +155,29 @@ impl WalSenders { .min() } - /// Get aggregated pageserver feedback. - pub fn get_ps_feedback(self: &Arc) -> PageserverFeedback { - self.mutex.lock().agg_ps_feedback + /// Returns total counter of pageserver feedbacks received and last feedback. + pub fn get_ps_feedback_stats(self: &Arc) -> (u64, PageserverFeedback) { + let shared = self.mutex.lock(); + (shared.ps_feedback_counter, shared.last_ps_feedback) } - /// Get aggregated pageserver and hot standby feedback (we send them to compute). - pub fn get_feedbacks(self: &Arc) -> (PageserverFeedback, HotStandbyFeedback) { - let shared = self.mutex.lock(); - (shared.agg_ps_feedback, shared.agg_hs_feedback) + /// Get aggregated hot standby feedback (we send it to compute). + pub fn get_hotstandby(self: &Arc) -> HotStandbyFeedback { + self.mutex.lock().agg_hs_feedback } /// Record new pageserver feedback, update aggregated values. fn record_ps_feedback(self: &Arc, id: WalSenderId, feedback: &PageserverFeedback) { let mut shared = self.mutex.lock(); shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback); - shared.update_ps_feedback(); + shared.last_ps_feedback = *feedback; + shared.ps_feedback_counter += 1; + drop(shared); + + RECEIVED_PS_FEEDBACKS.inc(); + + // send feedback to connected walproposers + self.walreceivers.broadcast_pageserver_feedback(*feedback); } /// Record standby reply. @@ -222,8 +233,10 @@ impl WalSenders { struct WalSendersShared { // aggregated over all walsenders value agg_hs_feedback: HotStandbyFeedback, - // aggregated over all walsenders value - agg_ps_feedback: PageserverFeedback, + // last feedback ever received from any pageserver, empty if none + last_ps_feedback: PageserverFeedback, + // total counter of pageserver feedbacks received + ps_feedback_counter: u64, slots: Vec>, } @@ -231,7 +244,8 @@ impl WalSendersShared { fn new() -> Self { WalSendersShared { agg_hs_feedback: HotStandbyFeedback::empty(), - agg_ps_feedback: PageserverFeedback::empty(), + last_ps_feedback: PageserverFeedback::empty(), + ps_feedback_counter: 0, slots: Vec::new(), } } @@ -276,37 +290,6 @@ impl WalSendersShared { } self.agg_hs_feedback = agg; } - - /// Update aggregated pageserver feedback. LSNs (last_received, - /// disk_consistent, remote_consistent) and reply timestamp are just - /// maximized; timeline_size if taken from feedback with highest - /// last_received lsn. This is generally reasonable, but we might want to - /// implement other policies once multiple pageservers start to be actively - /// used. - fn update_ps_feedback(&mut self) { - let init = PageserverFeedback::empty(); - let acc = - self.slots - .iter() - .flatten() - .fold(init, |mut acc, ws_state| match ws_state.feedback { - ReplicationFeedback::Pageserver(feedback) => { - if feedback.last_received_lsn > acc.last_received_lsn { - acc.current_timeline_size = feedback.current_timeline_size; - } - acc.last_received_lsn = - max(feedback.last_received_lsn, acc.last_received_lsn); - acc.disk_consistent_lsn = - max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn); - acc.remote_consistent_lsn = - max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn); - acc.replytime = max(feedback.replytime, acc.replytime); - acc - } - ReplicationFeedback::Standby(_) => acc, - }); - self.agg_ps_feedback = acc; - } } // Serialized is used only for pretty printing in json. @@ -443,7 +426,7 @@ impl SafekeeperPostgresHandler { }; let mut reply_reader = ReplyReader { reader, - ws_guard, + ws_guard: ws_guard.clone(), tli, }; @@ -452,6 +435,18 @@ impl SafekeeperPostgresHandler { r = sender.run() => r, r = reply_reader.run() => r, }; + + let ws_state = ws_guard + .walsenders + .mutex + .lock() + .get_slot(ws_guard.id) + .clone(); + info!( + "finished streaming to {}, feedback={:?}", + ws_state.addr, ws_state.feedback, + ); + // Join pg backend back. pgb.unsplit(reply_reader.reader)?; @@ -733,7 +728,6 @@ async fn wait_for_lsn( #[cfg(test)] mod tests { - use postgres_protocol::PG_EPOCH; use utils::id::{TenantId, TimelineId}; use super::*; @@ -792,27 +786,4 @@ mod tests { wss.update_hs_feedback(); assert_eq!(wss.agg_hs_feedback.xmin, 42); } - - // form pageserver feedback with given last_record_lsn / tli size and the - // rest set to dummy values. - fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback { - ReplicationFeedback::Pageserver(PageserverFeedback { - current_timeline_size, - last_received_lsn, - disk_consistent_lsn: Lsn::INVALID, - remote_consistent_lsn: Lsn::INVALID, - replytime: *PG_EPOCH, - }) - } - - // test that ps aggregation works as expected - #[test] - fn test_ps_feedback() { - let mut wss = WalSendersShared::new(); - push_feedback(&mut wss, ps_feedback(8, Lsn(42))); - push_feedback(&mut wss, ps_feedback(4, Lsn(84))); - wss.update_ps_feedback(); - assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4); - assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84)); - } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 9b7ab14218..4901b86acf 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -402,6 +402,7 @@ impl Timeline { ))); let (cancellation_tx, cancellation_rx) = watch::channel(false); + let walreceivers = WalReceivers::new(); Ok(Timeline { ttid, wal_backup_launcher_tx, @@ -410,8 +411,8 @@ impl Timeline { term_flush_lsn_watch_tx, term_flush_lsn_watch_rx, mutex: Mutex::new(shared_state), - walsenders: WalSenders::new(), - walreceivers: WalReceivers::new(), + walsenders: WalSenders::new(walreceivers.clone()), + walreceivers, cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), @@ -435,6 +436,7 @@ impl Timeline { let state = TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); + let walreceivers = WalReceivers::new(); Ok(Timeline { ttid, wal_backup_launcher_tx, @@ -443,8 +445,8 @@ impl Timeline { term_flush_lsn_watch_tx, term_flush_lsn_watch_rx, mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?), - walsenders: WalSenders::new(), - walreceivers: WalReceivers::new(), + walsenders: WalSenders::new(walreceivers.clone()), + walreceivers, cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), @@ -656,12 +658,9 @@ impl Timeline { let mut shared_state = self.write_shared_state().await; rmsg = shared_state.sk.process_msg(msg).await?; - // if this is AppendResponse, fill in proper pageserver and hot - // standby feedback. + // if this is AppendResponse, fill in proper hot standby feedback. if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { - let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks(); - resp.hs_feedback = hs_feedback; - resp.pageserver_feedback = ps_feedback; + resp.hs_feedback = self.walsenders.get_hotstandby(); } commit_lsn = shared_state.sk.state.inmem.commit_lsn; @@ -898,12 +897,13 @@ impl Timeline { return None; } - let ps_feedback = self.walsenders.get_ps_feedback(); + let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats(); let state = self.write_shared_state().await; if state.active { Some(FullTimelineInfo { ttid: self.ttid, - ps_feedback, + ps_feedback_count, + last_ps_feedback, wal_backup_active: state.wal_backup_active, timeline_is_active: state.active, num_computes: self.walreceivers.get_num() as u32,