From 07b85e7cfcf7d69c12e528ddde42d51444bbed27 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Thu, 12 May 2022 19:55:01 +0300 Subject: [PATCH] Safekeeper refactor: move callmemaybe_tx from SafekeeperPostgresBackend to Timeline --- safekeeper/src/bin/safekeeper.rs | 8 +-- safekeeper/src/handler.rs | 8 +-- safekeeper/src/receive_wal.rs | 11 +--- safekeeper/src/send_wal.rs | 6 +-- safekeeper/src/timeline.rs | 90 ++++++++++++++++++-------------- safekeeper/src/wal_service.rs | 19 ++----- 6 files changed, 66 insertions(+), 76 deletions(-) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 65e71fcc74..6955d2aa5c 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -17,6 +17,7 @@ use url::{ParseError, Url}; use safekeeper::control_file::{self}; use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use safekeeper::remove_wal; +use safekeeper::timeline::GlobalTimelines; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; use safekeeper::{broker, callmemaybe}; @@ -251,6 +252,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; + let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel(); + GlobalTimelines::set_callmemaybe_tx(callmemaybe_tx); let conf_ = conf.clone(); threads.push( @@ -279,13 +282,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b ); } - let (tx, rx) = mpsc::unbounded_channel(); let conf_cloned = conf.clone(); let safekeeper_thread = thread::Builder::new() .name("Safekeeper thread".into()) .spawn(|| { // thread code - let thread_result = wal_service::thread_main(conf_cloned, pg_listener, tx); + let thread_result = wal_service::thread_main(conf_cloned, pg_listener); if let Err(e) = thread_result { info!("safekeeper thread terminated: {}", e); } @@ -299,7 +301,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b .name("callmemaybe thread".into()) .spawn(|| { // thread code - let thread_result = callmemaybe::thread_main(conf_cloned, rx); + let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx); if let Err(e) = thread_result { error!("callmemaybe thread terminated: {}", e); } diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 7d86523b0e..9af78661f9 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -21,9 +21,6 @@ use utils::{ zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}, }; -use crate::callmemaybe::CallmeEvent; -use tokio::sync::mpsc::UnboundedSender; - /// Safekeeper handler of postgres commands pub struct SafekeeperPostgresHandler { pub conf: SafeKeeperConf, @@ -33,8 +30,6 @@ pub struct SafekeeperPostgresHandler { pub ztimelineid: Option, pub timeline: Option>, pageserver_connstr: Option, - //sender to communicate with callmemaybe thread - pub tx: UnboundedSender, } /// Parsed Postgres command. @@ -140,7 +135,7 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { } impl SafekeeperPostgresHandler { - pub fn new(conf: SafeKeeperConf, tx: UnboundedSender) -> Self { + pub fn new(conf: SafeKeeperConf) -> Self { SafekeeperPostgresHandler { conf, appname: None, @@ -148,7 +143,6 @@ impl SafekeeperPostgresHandler { ztimelineid: None, timeline: None, pageserver_connstr: None, - tx, } } diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 3ad99ab0df..0ef335c9ed 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -5,7 +5,6 @@ use anyhow::{anyhow, bail, Result}; use bytes::BytesMut; -use tokio::sync::mpsc::UnboundedSender; use tracing::*; use crate::timeline::Timeline; @@ -28,8 +27,6 @@ use utils::{ sock_split::ReadStream, }; -use crate::callmemaybe::CallmeEvent; - pub struct ReceiveWalConn<'pg> { /// Postgres connection pg_backend: &'pg mut PostgresBackend, @@ -91,10 +88,9 @@ impl<'pg> ReceiveWalConn<'pg> { // Register the connection and defer unregister. spg.timeline .get() - .on_compute_connect(self.pageserver_connstr.as_ref(), &spg.tx)?; + .on_compute_connect(self.pageserver_connstr.as_ref())?; let _guard = ComputeConnectionGuard { timeline: Arc::clone(spg.timeline.get()), - callmemaybe_tx: spg.tx.clone(), }; let mut next_msg = Some(next_msg); @@ -194,13 +190,10 @@ impl ProposerPollStream { struct ComputeConnectionGuard { timeline: Arc, - callmemaybe_tx: UnboundedSender, } impl Drop for ComputeConnectionGuard { fn drop(&mut self) { - self.timeline - .on_compute_disconnect(&self.callmemaybe_tx) - .unwrap(); + self.timeline.on_compute_disconnect().unwrap(); } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 960f70d154..d52dd6ea57 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -264,13 +264,13 @@ impl ReplicationConn { } else { let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); let zttid = spg.timeline.get().zttid; - let tx_clone = spg.tx.clone(); + let tx_clone = spg.timeline.get().callmemaybe_tx.clone(); let subscription_key = SubscriptionStateKey::new( zttid.tenant_id, zttid.timeline_id, pageserver_connstr.clone(), ); - spg.tx + tx_clone .send(CallmeEvent::Pause(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Pause request to callmemaybe thread {}", e); @@ -315,7 +315,7 @@ impl ReplicationConn { } else { // TODO: also check once in a while whether we are walsender // to right pageserver. - if spg.timeline.get().check_deactivate(replica_id, &spg.tx)? { + if spg.timeline.get().check_deactivate(replica_id)? { // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index a12f628e06..c73d6af4ac 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -275,15 +275,21 @@ impl SharedState { /// Database instance (tenant) pub struct Timeline { pub zttid: ZTenantTimelineId, + pub callmemaybe_tx: UnboundedSender, mutex: Mutex, /// conditional variable used to notify wal senders cond: Condvar, } impl Timeline { - fn new(zttid: ZTenantTimelineId, shared_state: SharedState) -> Timeline { + fn new( + zttid: ZTenantTimelineId, + callmemaybe_tx: UnboundedSender, + shared_state: SharedState, + ) -> Timeline { Timeline { zttid, + callmemaybe_tx, mutex: Mutex::new(shared_state), cond: Condvar::new(), } @@ -292,34 +298,27 @@ impl Timeline { /// Register compute connection, starting timeline-related activity if it is /// not running yet. /// Can fail only if channel to a static thread got closed, which is not normal at all. - pub fn on_compute_connect( - &self, - pageserver_connstr: Option<&String>, - callmemaybe_tx: &UnboundedSender, - ) -> Result<()> { + pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes += 1; // FIXME: currently we always adopt latest pageserver connstr, but we // should have kind of generations assigned by compute to distinguish // the latest one or even pass it through consensus to reliably deliver // to all safekeepers. - shared_state.activate(&self.zttid, pageserver_connstr, callmemaybe_tx)?; + shared_state.activate(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?; Ok(()) } /// De-register compute connection, shutting down timeline activity if /// pageserver doesn't need catchup. /// Can fail only if channel to a static thread got closed, which is not normal at all. - pub fn on_compute_disconnect( - &self, - callmemaybe_tx: &UnboundedSender, - ) -> Result<()> { + pub fn on_compute_disconnect(&self) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes -= 1; // If there is no pageserver, can suspend right away; otherwise let // walsender do that. if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() { - shared_state.deactivate(&self.zttid, callmemaybe_tx)?; + shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; } Ok(()) } @@ -327,11 +326,7 @@ impl Timeline { /// Deactivate tenant if there is no computes and pageserver is caughtup, /// assuming the pageserver status is in replica_id. /// Returns true if deactivated. - pub fn check_deactivate( - &self, - replica_id: usize, - callmemaybe_tx: &UnboundedSender, - ) -> Result { + pub fn check_deactivate(&self, replica_id: usize) -> Result { let mut shared_state = self.mutex.lock().unwrap(); if !shared_state.active { // already suspended @@ -343,7 +338,7 @@ impl Timeline { (replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn); if deactivate { - shared_state.deactivate(&self.zttid, callmemaybe_tx)?; + shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; return Ok(true); } } @@ -508,22 +503,35 @@ impl TimelineTools for Option> { } } +struct GlobalTimelinesState { + timelines: HashMap>, + callmemaybe_tx: Option>, +} + lazy_static! { - pub static ref TIMELINES: Mutex>> = - Mutex::new(HashMap::new()); + static ref TIMELINES_STATE: Mutex = Mutex::new(GlobalTimelinesState { + timelines: HashMap::new(), + callmemaybe_tx: None + }); } /// A zero-sized struct used to manage access to the global timelines map. pub struct GlobalTimelines; impl GlobalTimelines { + pub fn set_callmemaybe_tx(callmemaybe_tx: UnboundedSender) { + let mut state = TIMELINES_STATE.lock().unwrap(); + assert!(state.callmemaybe_tx.is_none()); + state.callmemaybe_tx = Some(callmemaybe_tx); + } + fn create_internal( - mut timelines: MutexGuard>>, + mut state: MutexGuard, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, peer_ids: Vec, ) -> Result> { - match timelines.get(&zttid) { + match state.timelines.get(&zttid) { Some(_) => bail!("timeline {} already exists", zttid), None => { // TODO: check directory existence @@ -532,8 +540,12 @@ impl GlobalTimelines { let shared_state = SharedState::create(conf, &zttid, peer_ids) .context("failed to create shared state")?; - let new_tli = Arc::new(Timeline::new(zttid, shared_state)); - timelines.insert(zttid, Arc::clone(&new_tli)); + let new_tli = Arc::new(Timeline::new( + zttid, + state.callmemaybe_tx.as_ref().unwrap().clone(), + shared_state, + )); + state.timelines.insert(zttid, Arc::clone(&new_tli)); Ok(new_tli) } } @@ -544,20 +556,20 @@ impl GlobalTimelines { zttid: ZTenantTimelineId, peer_ids: Vec, ) -> Result> { - let timelines = TIMELINES.lock().unwrap(); - GlobalTimelines::create_internal(timelines, conf, zttid, peer_ids) + let state = TIMELINES_STATE.lock().unwrap(); + GlobalTimelines::create_internal(state, conf, zttid, peer_ids) } - /// Get a timeline with control file loaded from the global TIMELINES map. + /// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map. /// If control file doesn't exist and create=false, bails out. pub fn get( conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool, ) -> Result> { - let mut timelines = TIMELINES.lock().unwrap(); + let mut state = TIMELINES_STATE.lock().unwrap(); - match timelines.get(&zttid) { + match state.timelines.get(&zttid) { Some(result) => Ok(Arc::clone(result)), None => { let shared_state = @@ -573,20 +585,19 @@ impl GlobalTimelines { .contains("No such file or directory") && create { - return GlobalTimelines::create_internal( - timelines, - conf, - zttid, - vec![], - ); + return GlobalTimelines::create_internal(state, conf, zttid, vec![]); } else { return Err(error); } } }; - let new_tli = Arc::new(Timeline::new(zttid, shared_state)); - timelines.insert(zttid, Arc::clone(&new_tli)); + let new_tli = Arc::new(Timeline::new( + zttid, + state.callmemaybe_tx.as_ref().unwrap().clone(), + shared_state, + )); + state.timelines.insert(zttid, Arc::clone(&new_tli)); Ok(new_tli) } } @@ -594,8 +605,9 @@ impl GlobalTimelines { /// Get ZTenantTimelineIDs of all active timelines. pub fn get_active_timelines() -> Vec { - let timelines = TIMELINES.lock().unwrap(); - timelines + let state = TIMELINES_STATE.lock().unwrap(); + state + .timelines .iter() .filter(|&(_, tli)| tli.is_active()) .map(|(zttid, _)| *zttid) diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 468ac28526..5980160788 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -8,29 +8,22 @@ use std::net::{TcpListener, TcpStream}; use std::thread; use tracing::*; -use crate::callmemaybe::CallmeEvent; use crate::handler::SafekeeperPostgresHandler; use crate::SafeKeeperConf; -use tokio::sync::mpsc::UnboundedSender; use utils::postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. -pub fn thread_main( - conf: SafeKeeperConf, - listener: TcpListener, - tx: UnboundedSender, -) -> Result<()> { +pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> { loop { match listener.accept() { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); let conf = conf.clone(); - let tx_clone = tx.clone(); let _ = thread::Builder::new() .name("WAL service thread".into()) .spawn(move || { - if let Err(err) = handle_socket(socket, conf, tx_clone) { + if let Err(err) = handle_socket(socket, conf) { error!("connection handler exited: {}", err); } }) @@ -51,16 +44,12 @@ fn get_tid() -> u64 { /// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket( - socket: TcpStream, - conf: SafeKeeperConf, - tx: UnboundedSender, -) -> Result<()> { +fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> { let _enter = info_span!("", tid = ?get_tid()).entered(); socket.set_nodelay(true)?; - let mut conn_handler = SafekeeperPostgresHandler::new(conf, tx); + let mut conn_handler = SafekeeperPostgresHandler::new(conf); let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?; // libpq replication protocol between safekeeper and replicas/pagers pgbackend.run(&mut conn_handler)?;