From 729ac38ea8cb4317b844676585ca8f42e245ec1e Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 10 Jan 2022 23:48:34 +0300 Subject: [PATCH] Centralize suspending/resuming timeline activity on safekeepers. Timeline is active whenever there is at least 1 connection from compute or pageserver is not caught up. Currently 'active' means callmemaybes are being sent. Fixes race: now suspend condition checking and callmemaybe unsubscribe happen under the same lock. --- walkeeper/src/receive_wal.rs | 53 ++----- walkeeper/src/send_wal.rs | 35 ++--- walkeeper/src/timeline.rs | 269 ++++++++++++++++++++++++----------- 3 files changed, 213 insertions(+), 144 deletions(-) diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 863aaafef1..b9420714fc 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context, Result}; use bytes::Bytes; use bytes::BytesMut; +use tokio::sync::mpsc::UnboundedSender; use tracing::*; use crate::timeline::Timeline; @@ -18,10 +19,8 @@ use crate::handler::SafekeeperPostgresHandler; use crate::timeline::TimelineTools; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeMessage}; -use zenith_utils::zid::ZTenantId; use crate::callmemaybe::CallmeEvent; -use crate::callmemaybe::SubscriptionStateKey; pub struct ReceiveWalConn<'pg> { /// Postgres connection @@ -82,50 +81,23 @@ impl<'pg> ReceiveWalConn<'pg> { let mut msg = self .read_msg() .context("failed to receive proposer greeting")?; - let tenant_id: ZTenantId; match msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( "start handshake with wal proposer {} sysid {} timeline {}", self.peer_addr, greeting.system_id, greeting.tli, ); - tenant_id = greeting.tenant_id; } _ => bail!("unexpected message {:?} instead of greeting", msg), } - // Incoming WAL stream resumed, so reset information about the timeline pause. - spg.timeline.get().continue_streaming(); - - // if requested, ask pageserver to fetch wal from us - // as long as this wal_stream is alive, callmemaybe thread - // will send requests to pageserver - let _guard = match self.pageserver_connstr { - Some(ref pageserver_connstr) => { - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver - // we should use callmemaybe mechanism. - let timeline_id = spg.timeline.get().zttid.timeline_id; - let subscription_key = SubscriptionStateKey::new( - tenant_id, - timeline_id, - pageserver_connstr.to_owned(), - ); - spg.tx - .send(CallmeEvent::Subscribe(subscription_key)) - .unwrap_or_else(|e| { - error!( - "failed to send Subscribe request to callmemaybe thread {}", - e - ); - }); - - // create a guard to unsubscribe callback, when this wal_stream will exit - Some(SendWalHandlerGuard { - timeline: Arc::clone(spg.timeline.get()), - }) - } - None => None, + // Register the connection and defer unregister. + spg.timeline + .get() + .on_compute_connect(self.pageserver_connstr.as_ref(), &spg.tx)?; + let _guard = ComputeConnectionGuard { + timeline: Arc::clone(spg.timeline.get()), + callmemaybe_tx: spg.tx.clone(), }; loop { @@ -142,12 +114,15 @@ impl<'pg> ReceiveWalConn<'pg> { } } -struct SendWalHandlerGuard { +struct ComputeConnectionGuard { timeline: Arc, + callmemaybe_tx: UnboundedSender, } -impl Drop for SendWalHandlerGuard { +impl Drop for ComputeConnectionGuard { fn drop(&mut self) { - self.timeline.stop_streaming(); + self.timeline + .on_compute_disconnect(&self.callmemaybe_tx) + .unwrap(); } } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index d96cdf5171..b219455c69 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -283,12 +283,14 @@ impl ReplicationConn { if spg.appname == Some("wal_proposer_recovery".to_string()) { None } else { - let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); - let tenant_id = spg.ztenantid.unwrap(); - let timeline_id = spg.timeline.get().zttid.timeline_id; + let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); + let zttid = spg.timeline.get().zttid; let tx_clone = spg.tx.clone(); - let subscription_key = - SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone()); + let subscription_key = SubscriptionStateKey::new( + zttid.tenant_id, + zttid.timeline_id, + pageserver_connstr.clone(), + ); spg.tx .send(CallmeEvent::Pause(subscription_key)) .unwrap_or_else(|e| { @@ -298,8 +300,8 @@ impl ReplicationConn { // create a guard to subscribe callback again, when this connection will exit Some(ReplicationStreamGuard { tx: tx_clone, - tenant_id, - timeline_id, + tenant_id: zttid.tenant_id, + timeline_id: zttid.timeline_id, pageserver_connstr, }) } @@ -324,21 +326,10 @@ impl ReplicationConn { if let Some(lsn) = lsn { end_pos = lsn; } else { - // Is it time to end streaming to this replica? - if spg.timeline.get().check_stop_streaming(replica_id) { - // this expect should never fail because in wal_proposer_recovery mode stop_pos is set - // and this code is not reachable - let pageserver_connstr = pageserver_connstr - .expect("there should be a pageserver connection string"); - let tenant_id = spg.ztenantid.unwrap(); - let timeline_id = spg.timeline.get().zttid.timeline_id; - let subscription_key = - SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr); - spg.tx - .send(CallmeEvent::Unsubscribe(subscription_key)) - .unwrap_or_else(|e| { - error!("failed to send Pause request to callmemaybe thread {}", e); - }); + // TODO: also check once in a while whether we are walsender + // to right pageserver. + if spg.timeline.get().check_deactivate(replica_id, &spg.tx)? { + // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 14ef7166f3..37b0ec22ac 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -12,12 +12,14 @@ use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; +use tokio::sync::mpsc::UnboundedSender; use tracing::*; use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantTimelineId; +use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, Storage, SK_FORMAT_VERSION, SK_MAGIC, @@ -70,18 +72,25 @@ impl ReplicaState { } } -/// Shared state associated with database instance (tenant) +/// Shared state associated with database instance struct SharedState { /// Safekeeper object sk: SafeKeeper, /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about notified_commit_lsn: Lsn, - // Set stop_lsn to inform WAL senders that it's time to stop sending WAL, - // so that it send all wal up stop_lsn and can safely exit streaming connections. - stop_lsn: Option, /// State of replicas replicas: Vec>, + /// Inactive clusters shouldn't occupy any resources, so timeline is + /// activated whenever there is a compute connection or pageserver is not + /// caughtup (it must have latest WAL for new compute start) and suspended + /// otherwise. + /// + /// TODO: it might be better to remove tli completely from GlobalTimelines + /// when tli is inactive instead of having this flag. + active: bool, + num_computes: u32, + pageserver_connstr: Option, } // A named boolean. @@ -102,6 +111,118 @@ lazy_static! { } impl SharedState { + /// Restore SharedState from control file. + /// If create=false and file doesn't exist, bails out. + fn create_restore( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + create: CreateControlFile, + ) -> Result { + let state = FileStorage::load_control_file_conf(conf, zttid, create) + .context("failed to load from control file")?; + let file_storage = FileStorage::new(zttid, conf); + let flush_lsn = if state.server.wal_seg_size != 0 { + let wal_dir = conf.timeline_dir(zttid); + find_end_of_wal( + &wal_dir, + state.server.wal_seg_size as usize, + true, + state.wal_start_lsn, + )? + .0 + } else { + 0 + }; + + Ok(Self { + notified_commit_lsn: Lsn(0), + sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state), + replicas: Vec::new(), + active: false, + num_computes: 0, + pageserver_connstr: None, + }) + } + + /// Activate the timeline: start/change walsender (via callmemaybe). + fn activate( + &mut self, + zttid: &ZTenantTimelineId, + pageserver_connstr: Option<&String>, + callmemaybe_tx: &UnboundedSender, + ) -> Result<()> { + if let Some(ref pageserver_connstr) = self.pageserver_connstr { + // unsub old sub. xxx: callmemaybe is going out + let old_subscription_key = SubscriptionStateKey::new( + zttid.tenant_id, + zttid.timeline_id, + pageserver_connstr.to_owned(), + ); + callmemaybe_tx + .send(CallmeEvent::Unsubscribe(old_subscription_key)) + .unwrap_or_else(|e| { + error!("failed to send Pause request to callmemaybe thread {}", e); + }); + } + if let Some(pageserver_connstr) = pageserver_connstr { + let subscription_key = SubscriptionStateKey::new( + zttid.tenant_id, + zttid.timeline_id, + pageserver_connstr.to_owned(), + ); + // xx: sending to channel under lock is not very cool, but + // shouldn't be a problem here. If it is, we can grab a counter + // here and later augment channel messages with it. + callmemaybe_tx + .send(CallmeEvent::Subscribe(subscription_key)) + .unwrap_or_else(|e| { + error!( + "failed to send Subscribe request to callmemaybe thread {}", + e + ); + }); + info!( + "timeline {} is subscribed to callmemaybe to {}", + zttid.timeline_id, pageserver_connstr + ); + } + self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned()); + self.active = true; + Ok(()) + } + + /// Deactivate the timeline: stop callmemaybe. + fn deactivate( + &mut self, + zttid: &ZTenantTimelineId, + callmemaybe_tx: &UnboundedSender, + ) -> Result<()> { + if self.active { + if let Some(ref pageserver_connstr) = self.pageserver_connstr { + let subscription_key = SubscriptionStateKey::new( + zttid.tenant_id, + zttid.timeline_id, + pageserver_connstr.to_owned(), + ); + callmemaybe_tx + .send(CallmeEvent::Unsubscribe(subscription_key)) + .unwrap_or_else(|e| { + error!( + "failed to send Unsubscribe request to callmemaybe thread {}", + e + ); + }); + info!( + "timeline {} is unsubscribed from callmemaybe to {}", + zttid.timeline_id, + self.pageserver_connstr.as_ref().unwrap() + ); + } + self.active = false; + } + Ok(()) + } + /// Get combined state of all alive replicas pub fn get_replicas_state(&self) -> ReplicaState { let mut acc = ReplicaState::new(); @@ -159,37 +280,6 @@ impl SharedState { self.replicas.push(Some(state)); pos } - - /// Restore SharedState from control file. - /// If create=false and file doesn't exist, bails out. - fn create_restore( - conf: &SafeKeeperConf, - zttid: &ZTenantTimelineId, - create: CreateControlFile, - ) -> Result { - let state = FileStorage::load_control_file_conf(conf, zttid, create) - .context("failed to load from control file")?; - let file_storage = FileStorage::new(zttid, conf); - let flush_lsn = if state.server.wal_seg_size != 0 { - let wal_dir = conf.timeline_dir(zttid); - find_end_of_wal( - &wal_dir, - state.server.wal_seg_size as usize, - true, - state.wal_start_lsn, - )? - .0 - } else { - 0 - }; - - Ok(Self { - notified_commit_lsn: Lsn(0), - stop_lsn: None, - sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state), - replicas: Vec::new(), - }) - } } /// Database instance (tenant) @@ -209,6 +299,67 @@ impl Timeline { } } + /// Register compute connection, starting timeline-related activity if it is + /// not running yet. + /// Can fail only if channel to a static thread got closed, which is not normal at all. + pub fn on_compute_connect( + &self, + pageserver_connstr: Option<&String>, + callmemaybe_tx: &UnboundedSender, + ) -> Result<()> { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.num_computes += 1; + // FIXME: currently we always adopt latest pageserver connstr, but we + // should have kind of generations assigned by compute to distinguish + // the latest one or even pass it through consensus to reliably deliver + // to all safekeepers. + shared_state.activate(&self.zttid, pageserver_connstr, callmemaybe_tx)?; + Ok(()) + } + + /// De-register compute connection, shutting down timeline activity if + /// pageserver doesn't need catchup. + /// Can fail only if channel to a static thread got closed, which is not normal at all. + pub fn on_compute_disconnect( + &self, + callmemaybe_tx: &UnboundedSender, + ) -> Result<()> { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.num_computes -= 1; + // If there is no pageserver, can suspend right away; otherwise let + // walsender do that. + if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() { + shared_state.deactivate(&self.zttid, callmemaybe_tx)?; + } + Ok(()) + } + + /// Deactivate tenant if there is no computes and pageserver is caughtup, + /// assuming the pageserver status is in replica_id. + /// Returns true if deactivated. + pub fn check_deactivate( + &self, + replica_id: usize, + callmemaybe_tx: &UnboundedSender, + ) -> Result { + let mut shared_state = self.mutex.lock().unwrap(); + if !shared_state.active { + // already suspended + return Ok(true); + } + if shared_state.num_computes == 0 { + let replica_state = shared_state.replicas[replica_id].unwrap(); + let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet + (replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. + replica_state.last_received_lsn >= shared_state.sk.commit_lsn); + if deactivate { + shared_state.deactivate(&self.zttid, callmemaybe_tx)?; + return Ok(true); + } + } + Ok(false) + } + /// Timed wait for an LSN to be committed. /// /// Returns the last committed LSN, which will be at least @@ -242,54 +393,6 @@ impl Timeline { } } - // Notify WAL senders that it's time to stop sending WAL - pub fn stop_streaming(&self) { - let mut shared_state = self.mutex.lock().unwrap(); - // Ensure that safekeeper sends WAL up to the last known committed LSN. - // It guarantees that pageserver will receive all the latest data - // before walservice disconnects. - shared_state.stop_lsn = Some(shared_state.notified_commit_lsn); - trace!( - "Stopping WAL senders. stop_lsn: {}", - shared_state.notified_commit_lsn - ); - } - - // Reset stop_lsn notification, - // so that WAL senders will continue sending WAL - pub fn continue_streaming(&self) { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.stop_lsn = None; - } - - // Check if it's time to stop streaming to the given replica. - // - // Do not stop streaming until replica is caught up with the stop_lsn. - // This is not necessary for correctness, just an optimization to - // be able to remove WAL from safekeeper and decrease amount of work - // on the next start. - pub fn check_stop_streaming(&self, replica_id: usize) -> bool { - let shared_state = self.mutex.lock().unwrap(); - - // If stop_lsn is set, it's time to shutdown streaming. - if let Some(stop_lsn_request) = shared_state.stop_lsn { - let replica_state = shared_state.replicas[replica_id].unwrap(); - // There is no data to stream, so other clauses don't matter. - if shared_state.notified_commit_lsn == Lsn(0) { - return true; - } - // Lsn::MAX means that we don't know the latest LSN yet. - // That may be a new replica, give it a chance to catch up. - if replica_state.last_received_lsn != Lsn::MAX - // If replica is fully caught up, disconnect it. - && stop_lsn_request <= replica_state.last_received_lsn - { - return true; - } - } - false - } - /// Pass arrived message to the safekeeper. pub fn process_msg( &self,