From d597e6d42b0082335f6c3aa42fd6267c1a270265 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 11 Aug 2023 17:02:16 +0300 Subject: [PATCH] Track list of walreceivers and their voting/streaming state in shmem. Also add both walsenders and walreceivers to TimelineStatus (available under v1/tenant/xxx/timeline/yyy). Prepares for https://github.com/neondatabase/neon/pull/4875 --- safekeeper/src/http/routes.rs | 6 ++ safekeeper/src/receive_wal.rs | 123 ++++++++++++++++++++++++++++++++-- safekeeper/src/timeline.rs | 83 +++++++++++------------ 3 files changed, 163 insertions(+), 49 deletions(-) diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 411bfa295c..b5a8a14558 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -15,8 +15,10 @@ use tokio::fs::File; use tokio::io::AsyncReadExt; use utils::http::endpoint::request_span; +use crate::receive_wal::WalReceiverState; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; +use crate::send_wal::WalSenderState; use crate::{debug_dump, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; @@ -99,6 +101,8 @@ pub struct TimelineStatus { pub peer_horizon_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, + pub walsenders: Vec, + pub walreceivers: Vec, } fn check_permission(request: &Request, tenant_id: Option) -> Result<(), ApiError> { @@ -149,6 +153,8 @@ async fn timeline_status_handler(request: Request) -> Result, +} + +/// Id under which walreceiver is registered in shmem. +type WalReceiverId = usize; + +impl WalReceivers { + pub fn new() -> Arc { + Arc::new(WalReceivers { + mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }), + }) + } + + /// Register new walreceiver. Returned guard provides access to the slot and + /// automatically deregisters in Drop. + pub fn register(self: &Arc) -> WalReceiverGuard { + let slots = &mut self.mutex.lock().slots; + let walreceiver = WalReceiverState::Voting; + // find empty slot or create new one + let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) { + slots[pos] = Some(walreceiver); + pos + } else { + let pos = slots.len(); + slots.push(Some(walreceiver)); + pos + }; + WalReceiverGuard { + id: pos, + walreceivers: self.clone(), + } + } + + /// Get reference to locked slot contents. Slot must exist (registered + /// earlier). + fn get_slot<'a>( + self: &'a Arc, + id: WalReceiverId, + ) -> MappedMutexGuard<'a, WalReceiverState> { + MutexGuard::map(self.mutex.lock(), |locked| { + locked.slots[id] + .as_mut() + .expect("walreceiver doesn't exist") + }) + } + + /// Get number of walreceivers (compute connections). + pub fn get_num(self: &Arc) -> usize { + self.mutex.lock().slots.iter().flatten().count() + } + + /// Get state of all walreceivers. + pub fn get_all(self: &Arc) -> Vec { + self.mutex.lock().slots.iter().flatten().cloned().collect() + } + + /// Unregister walsender. + fn unregister(self: &Arc, id: WalReceiverId) { + let mut shared = self.mutex.lock(); + shared.slots[id] = None; + } +} + +/// Only a few connections are expected (normally one), so store in Vec. +struct WalReceiversShared { + slots: Vec>, +} + +/// Walreceiver status. Currently only whether it passed voting stage and +/// started receiving the stream, but it is easy to add more if needed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum WalReceiverState { + Voting, + Streaming, +} + +/// Scope guard to access slot in WalSenders registry and unregister from it in +/// Drop. +pub struct WalReceiverGuard { + id: WalReceiverId, + walreceivers: Arc, +} + +impl WalReceiverGuard { + /// Get reference to locked shared state contents. + fn get(&self) -> MappedMutexGuard { + self.walreceivers.get_slot(self.id) + } +} + +impl Drop for WalReceiverGuard { + fn drop(&mut self) { + self.walreceivers.unregister(self.id); + } +} + const MSG_QUEUE_SIZE: usize = 256; const REPLY_QUEUE_SIZE: usize = 16; @@ -246,10 +350,13 @@ impl WalAcceptor { /// it must mean that network thread terminated. async fn run(&mut self) -> anyhow::Result<()> { // Register the connection and defer unregister. - self.tli.on_compute_connect().await?; - let _guard = ComputeConnectionGuard { + // Order of the next two lines is important: we want first to remove our entry and then + // update status which depends on registered connections. + let _compute_conn_guard = ComputeConnectionGuard { timeline: Arc::clone(&self.tli), }; + let walreceiver_guard = self.tli.get_walreceivers().register(); + self.tli.update_status_notify().await?; // After this timestamp we will stop processing AppendRequests and send a response // to the walproposer. walproposer sends at least one AppendRequest per second, @@ -263,6 +370,11 @@ impl WalAcceptor { } let mut next_msg = opt_msg.unwrap(); + // Update walreceiver state in shmem for reporting. + if let ProposerAcceptorMessage::Elected(_) = &next_msg { + *walreceiver_guard.get() = WalReceiverState::Streaming; + } + let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) { // loop through AppendRequest's while it's readily available to // write as many WAL as possible without fsyncing @@ -311,6 +423,7 @@ impl WalAcceptor { } } +/// Calls update_status_notify in drop to update timeline status. struct ComputeConnectionGuard { timeline: Arc, } @@ -318,11 +431,9 @@ struct ComputeConnectionGuard { impl Drop for ComputeConnectionGuard { fn drop(&mut self) { let tli = self.timeline.clone(); - // tokio forbids to call blocking_send inside the runtime, and see - // comments in on_compute_disconnect why we call blocking_send. tokio::spawn(async move { - if let Err(e) = tli.on_compute_disconnect().await { - error!("failed to unregister compute connection: {}", e); + if let Err(e) = tli.update_status_notify().await { + error!("failed to update timeline status: {}", e); } }); } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index df51c45aff..f0edd53966 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -23,6 +23,7 @@ use utils::{ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use crate::receive_wal::WalReceivers; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, SafekeeperMemState, ServerInfo, Term, @@ -164,8 +165,8 @@ impl SharedState { }) } - fn is_active(&self, remote_consistent_lsn: Lsn) -> bool { - self.is_wal_backup_required() + fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool { + self.is_wal_backup_required(num_computes) // FIXME: add tracking of relevant pageservers and check them here individually, // otherwise migration won't work (we suspend too early). || remote_consistent_lsn < self.sk.inmem.commit_lsn @@ -173,29 +174,34 @@ impl SharedState { /// Mark timeline active/inactive and return whether s3 offloading requires /// start/stop action. - fn update_status(&mut self, remote_consistent_lsn: Lsn, ttid: TenantTimelineId) -> bool { - let is_active = self.is_active(remote_consistent_lsn); + fn update_status( + &mut self, + num_computes: usize, + remote_consistent_lsn: Lsn, + ttid: TenantTimelineId, + ) -> bool { + let is_active = self.is_active(num_computes, remote_consistent_lsn); if self.active != is_active { info!("timeline {} active={} now", ttid, is_active); } self.active = is_active; - self.is_wal_backup_action_pending() + self.is_wal_backup_action_pending(num_computes) } /// Should we run s3 offloading in current state? - fn is_wal_backup_required(&self) -> bool { + fn is_wal_backup_required(&self, num_computes: usize) -> bool { let seg_size = self.get_wal_seg_size(); - self.num_computes > 0 || + num_computes > 0 || // Currently only the whole segment is offloaded, so compare segment numbers. - (self.sk.inmem.commit_lsn.segment_number(seg_size) > - self.sk.inmem.backup_lsn.segment_number(seg_size)) + (self.sk.inmem.commit_lsn.segment_number(seg_size) > + self.sk.inmem.backup_lsn.segment_number(seg_size)) } /// Is current state of s3 offloading is not what it ought to be? - fn is_wal_backup_action_pending(&self) -> bool { - let res = self.wal_backup_active != self.is_wal_backup_required(); + fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool { + let res = self.wal_backup_active != self.is_wal_backup_required(num_computes); if res { - let action_pending = if self.is_wal_backup_required() { + let action_pending = if self.is_wal_backup_required(num_computes) { "start" } else { "stop" @@ -210,8 +216,8 @@ impl SharedState { /// Returns whether s3 offloading is required and sets current status as /// matching. - fn wal_backup_attend(&mut self) -> bool { - self.wal_backup_active = self.is_wal_backup_required(); + fn wal_backup_attend(&mut self, num_computes: usize) -> bool { + self.wal_backup_active = self.is_wal_backup_required(num_computes); self.wal_backup_active } @@ -295,6 +301,7 @@ pub struct Timeline { /// while holding it, ensuring that consensus checks are in order. mutex: Mutex, walsenders: Arc, + walreceivers: Arc, /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal. cancellation_tx: watch::Sender, @@ -329,6 +336,7 @@ impl Timeline { commit_lsn_watch_rx, mutex: Mutex::new(shared_state), walsenders: WalSenders::new(rcl), + walreceivers: WalReceivers::new(), cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), @@ -355,6 +363,7 @@ impl Timeline { commit_lsn_watch_rx, mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?), walsenders: WalSenders::new(Lsn(0)), + walreceivers: WalReceivers::new(), cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), @@ -441,40 +450,22 @@ impl Timeline { } fn update_status(&self, shared_state: &mut SharedState) -> bool { - shared_state.update_status(self.get_walsenders().get_remote_consistent_lsn(), self.ttid) + shared_state.update_status( + self.walreceivers.get_num(), + self.get_walsenders().get_remote_consistent_lsn(), + self.ttid, + ) } - /// Register compute connection, starting timeline-related activity if it is - /// not running yet. - pub async fn on_compute_connect(&self) -> Result<()> { + /// Update timeline status and kick wal backup launcher to stop/start offloading if needed. + pub async fn update_status_notify(&self) -> Result<()> { if self.is_cancelled() { bail!(TimelineError::Cancelled(self.ttid)); } - - let is_wal_backup_action_pending: bool; - { + let is_wal_backup_action_pending: bool = { let mut shared_state = self.write_shared_state().await; - shared_state.num_computes += 1; - is_wal_backup_action_pending = self.update_status(&mut shared_state); - } - // Wake up wal backup launcher, if offloading not started yet. - if is_wal_backup_action_pending { - // Can fail only if channel to a static thread got closed, which is not normal at all. - self.wal_backup_launcher_tx.send(self.ttid).await?; - } - Ok(()) - } - - /// De-register compute connection, shutting down timeline activity if - /// pageserver doesn't need catchup. - pub async fn on_compute_disconnect(&self) -> Result<()> { - let is_wal_backup_action_pending: bool; - { - let mut shared_state = self.write_shared_state().await; - shared_state.num_computes -= 1; - is_wal_backup_action_pending = self.update_status(&mut shared_state); - } - // Wake up wal backup launcher, if it is time to stop the offloading. + self.update_status(&mut shared_state) + }; if is_wal_backup_action_pending { // Can fail only if channel to a static thread got closed, which is not normal at all. self.wal_backup_launcher_tx.send(self.ttid).await?; @@ -519,7 +510,9 @@ impl Timeline { return false; } - self.write_shared_state().await.wal_backup_attend() + self.write_shared_state() + .await + .wal_backup_attend(self.walreceivers.get_num()) } /// Returns commit_lsn watch channel. @@ -650,6 +643,10 @@ impl Timeline { &self.walsenders } + pub fn get_walreceivers(&self) -> &Arc { + &self.walreceivers + } + /// Returns flush_lsn. pub async fn get_flush_lsn(&self) -> Lsn { self.write_shared_state().await.sk.wal_store.flush_lsn()