diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index aceb759cd2..20d06a9967 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -184,7 +184,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { ); } - let (tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::unbounded_channel(); let conf_cloned = conf.clone(); let wal_acceptor_thread = thread::Builder::new() .name("WAL acceptor thread".into()) diff --git a/walkeeper/src/callmemaybe.rs b/walkeeper/src/callmemaybe.rs index 66eb9968ba..f7f8970472 100644 --- a/walkeeper/src/callmemaybe.rs +++ b/walkeeper/src/callmemaybe.rs @@ -1,18 +1,22 @@ //! -//! Callmemaybe thread is responsible for periodically requesting +//! Callmemaybe module is responsible for periodically requesting //! pageserver to initiate wal streaming. //! +//! Other threads can use CallmeEvent messages to subscribe or unsubscribe +//! from the call list. +//! use crate::SafeKeeperConf; +use anyhow::anyhow; use anyhow::Result; -use log::*; use std::collections::HashMap; -use std::net::ToSocketAddrs; use std::sync::Mutex; use std::time::{Duration, Instant}; use tokio::runtime; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::task; use tokio_postgres::NoTls; +use tracing::*; +use zenith_utils::connstring::connection_host_port; use zenith_utils::zid::{ZTenantId, ZTimelineId}; async fn request_callback( @@ -33,21 +37,14 @@ async fn request_callback( } }); - // Send callmemaybe request - let listen_pg_addr = listen_pg_addr_str - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); + // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses + let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str); + let me_conf: postgres::config::Config = me_connstr.parse().unwrap(); + let (host, port) = connection_host_port(&me_conf); let callme = format!( "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'", - tenantid, - timelineid, - &listen_pg_addr.ip().to_string(), - listen_pg_addr.port(), - timelineid, - tenantid + tenantid, timelineid, host, port, timelineid, tenantid ); let _ = client.simple_query(&callme).await?; @@ -55,7 +52,7 @@ async fn request_callback( Ok(()) } -pub fn thread_main(conf: SafeKeeperConf, rx: Receiver) -> Result<()> { +pub fn thread_main(conf: SafeKeeperConf, rx: UnboundedReceiver) -> Result<()> { let runtime = runtime::Builder::new_current_thread() .enable_all() .build() @@ -64,6 +61,7 @@ pub fn thread_main(conf: SafeKeeperConf, rx: Receiver) -> Result<() runtime.block_on(main_loop(conf, rx)) } +/// Messages to the callmemaybe thread #[derive(Debug)] pub enum CallmeEvent { // add new subscription to the list @@ -77,17 +75,13 @@ pub enum CallmeEvent { Resume(ZTenantId, ZTimelineId), } -struct SubscriptionStateInner { - handle: Option>, - last_call_time: Instant, - paused: bool, -} - struct SubscriptionState { tenantid: ZTenantId, timelineid: ZTimelineId, pageserver_connstr: String, - inner: Mutex, + handle: Option>, + last_call_time: Instant, + paused: bool, } impl SubscriptionState { @@ -96,63 +90,29 @@ impl SubscriptionState { timelineid: ZTimelineId, pageserver_connstr: String, ) -> SubscriptionState { - let state_inner = SubscriptionStateInner { - handle: None, - last_call_time: Instant::now(), - paused: false, - }; - SubscriptionState { tenantid, timelineid, pageserver_connstr, - inner: Mutex::new(state_inner), + handle: None, + last_call_time: Instant::now(), + paused: false, } } - fn pause(&self) { - let mut state_inner = self.inner.lock().unwrap(); - state_inner.paused = true; - - if let Some(handle) = state_inner.handle.take() { - handle.abort(); - } + fn pause(&mut self) { + self.paused = true; + self.abort_handle(); } - fn resume(&self) { - let mut state_inner = self.inner.lock().unwrap(); - state_inner.paused = false; + fn resume(&mut self) { + self.paused = false; } - fn call(&self, recall_period: Duration, listen_pg_addr: String) { - let mut state_inner = self.inner.lock().unwrap(); - - // Ignore call request if this subscription is paused - if state_inner.paused { - debug!( - "ignore call request for paused subscription - tenantid: {}, timelineid: {}", - self.tenantid, self.timelineid - ); - return; - } - - // Check if it too early to recall - if state_inner.handle.is_some() && state_inner.last_call_time.elapsed() < recall_period { - debug!( - "too early to recall. state_inner.last_call_time.elapsed: {:?}, recall_period: {:?} - tenantid: {}, timelineid: {}", - state_inner.last_call_time, recall_period, self.tenantid, self.timelineid - ); - return; - } - - // If previous task didn't complete in recall_period, it must be hanging, - // so don't wait for it forever, just abort it and try again. - // - // Most likely, the task have already successfully completed - // and abort() won't have any effect. - if let Some(handle) = state_inner.handle.take() { + // Most likely, the task have already successfully completed + // and abort() won't have any effect. + fn abort_handle(&mut self) { + if let Some(handle) = self.handle.take() { handle.abort(); let timelineid = self.timelineid; @@ -162,111 +122,128 @@ impl SubscriptionState { if err.is_cancelled() { warn!("callback task for timelineid={} tenantid={} was cancelled before spawning a new one", timelineid, tenantid); + } else { + error!( + "callback task for timelineid={} tenantid={} failed: {}", + timelineid, tenantid, err + ); } } }); } + } + + fn call(&mut self, recall_period: Duration, listen_pg_addr: String) { + // Ignore call request if this subscription is paused + if self.paused { + debug!( + "ignore call request for paused subscription + tenantid: {}, timelineid: {}", + self.tenantid, self.timelineid + ); + return; + } + + // Check if it too early to recall + if self.handle.is_some() && self.last_call_time.elapsed() < recall_period { + debug!( + "too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?} + tenantid: {}, timelineid: {}", + self.last_call_time, recall_period, self.tenantid, self.timelineid + ); + return; + } + + // If previous task didn't complete in recall_period, it must be hanging, + // so don't wait for it forever, just abort it and try again. + self.abort_handle(); let timelineid = self.timelineid; let tenantid = self.tenantid; let pageserver_connstr = self.pageserver_connstr.clone(); - state_inner.handle = Some(tokio::spawn(async move { + self.handle = Some(tokio::spawn(async move { request_callback(pageserver_connstr, listen_pg_addr, timelineid, tenantid) .await .unwrap_or_else(|e| { error!( - "callmemaybe. request_callback for timelineid={} tenantid={} failed: {}", + "callback task for timelineid={} tenantid={} failed: {}", timelineid, tenantid, e - ); - }) + ) + }); })); // Update last_call_time - state_inner.last_call_time = Instant::now(); + self.last_call_time = Instant::now(); debug!( "new call spawned. time {:?} tenantid: {}, timelineid: {}", - state_inner.last_call_time, self.tenantid, self.timelineid + self.last_call_time, self.tenantid, self.timelineid ); } } -impl Drop for SubscriptionStateInner { +impl Drop for SubscriptionState { fn drop(&mut self) { - if let Some(handle) = self.handle.take() { - handle.abort(); - } + self.abort_handle(); } } -pub async fn main_loop(conf: SafeKeeperConf, mut rx: Receiver) -> Result<()> { +pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver) -> Result<()> { let subscriptions: Mutex> = Mutex::new(HashMap::new()); loop { - let call_iteration = tokio::select! { - request = rx.recv() => { - match request { - Some(request) => + tokio::select! { + request = rx.recv() => + { + match request.ok_or_else(|| anyhow!("done"))? + { + CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) => { - match request + let mut subscriptions = subscriptions.lock().unwrap(); + if let Some(mut sub) = subscriptions.insert((tenantid, timelineid), + SubscriptionState::new(tenantid, timelineid, pageserver_connstr)) { - CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) => - { - let mut subscriptions = subscriptions.lock().unwrap(); - subscriptions.insert((tenantid, timelineid), SubscriptionState::new(tenantid, timelineid, pageserver_connstr)) ; - debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}", - timelineid, tenantid); - true - }, - CallmeEvent::Unsubscribe(tenantid, timelineid) => { - let mut subscriptions = subscriptions.lock().unwrap(); - subscriptions.remove(&(tenantid, timelineid)); - debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}", - timelineid, tenantid); - false - }, - CallmeEvent::Pause(tenantid, timelineid) => { - let subscriptions = subscriptions.lock().unwrap(); - if let Some(sub) = subscriptions.get(&(tenantid, timelineid)) - { - sub.pause(); - } - debug!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}", - timelineid, tenantid); - false - }, - CallmeEvent::Resume(tenantid, timelineid) => { - let mut call_iteration = false; - let subscriptions = subscriptions.lock().unwrap(); - if let Some(sub) = subscriptions.get(&(tenantid, timelineid)) - { - sub.resume(); - debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}", - timelineid, tenantid); - call_iteration = true; - } - call_iteration - }, + sub.call(conf.recall_period, conf.listen_pg_addr.clone()); } + debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}", + timelineid, tenantid); }, - // all senders disconnected - None => { - return Ok(()); + CallmeEvent::Unsubscribe(tenantid, timelineid) => { + let mut subscriptions = subscriptions.lock().unwrap(); + subscriptions.remove(&(tenantid, timelineid)); + debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}", + timelineid, tenantid); + }, + CallmeEvent::Pause(tenantid, timelineid) => { + let mut subscriptions = subscriptions.lock().unwrap(); + if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid)) + { + sub.pause(); + }; + debug!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}", + timelineid, tenantid); + }, + CallmeEvent::Resume(tenantid, timelineid) => { + let mut subscriptions = subscriptions.lock().unwrap(); + if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid)) + { + sub.resume(); + sub.call(conf.recall_period, conf.listen_pg_addr.clone()); + }; + + debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}", + timelineid, tenantid); }, } }, - _ = tokio::time::sleep(conf.recall_period) => { true }, + _ = tokio::time::sleep(conf.recall_period) => { + let mut subscriptions = subscriptions.lock().unwrap(); + + for (&(_tenantid, _timelineid), state) in subscriptions.iter_mut() { + state.call(conf.recall_period, conf.listen_pg_addr.clone()); + } + }, }; - - if call_iteration { - let subscriptions = subscriptions.lock().unwrap(); - - for (&(_tenantid, _timelineid), state) in subscriptions.iter() { - let listen_pg_addr = conf.listen_pg_addr.clone(); - - state.call(conf.recall_period, listen_pg_addr); - } - } } } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index a85fe114c6..6a3ab62574 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -5,12 +5,7 @@ use anyhow::{bail, Context, Result}; use bytes::Bytes; use bytes::BytesMut; -<<<<<<< HEAD -use postgres::{Client, Config, NoTls}; use tracing::*; -======= -use log::*; ->>>>>>> callmemaybe refactoring use std::net::SocketAddr; @@ -24,7 +19,7 @@ use zenith_utils::pq_proto::{BeMessage, FeMessage}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; pub struct ReceiveWalConn<'pg> { /// Postgres connection @@ -103,17 +98,23 @@ impl<'pg> ReceiveWalConn<'pg> { let _guard = match self.pageserver_connstr { Some(ref pageserver_connstr) => { // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + // Add far as replication in postgres is initiated by receiver + // we should use callmemaybe mechanism. let timelineid = swh.timeline.get().timelineid; let tx_clone = swh.tx.clone(); let pageserver_connstr = pageserver_connstr.to_owned(); swh.tx - .blocking_send(CallmeEvent::Subscribe( + .send(CallmeEvent::Subscribe( tenant_id, timelineid, pageserver_connstr, )) - .unwrap(); + .unwrap_or_else(|e| { + error!( + "failed to send Subscribe request to callmemaybe thread {}", + e + ); + }); // create a guard to unsubscribe callback, when this wal_stream will exit Some(SendWalHandlerGuard { @@ -140,7 +141,7 @@ impl<'pg> ReceiveWalConn<'pg> { } struct SendWalHandlerGuard { - tx: Sender, + tx: UnboundedSender, tenant_id: ZTenantId, timelineid: ZTimelineId, } @@ -148,7 +149,12 @@ struct SendWalHandlerGuard { impl Drop for SendWalHandlerGuard { fn drop(&mut self) { self.tx - .blocking_send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) - .unwrap(); + .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) + .unwrap_or_else(|e| { + error!( + "failed to send Unsubscribe request to callmemaybe thread {}", + e + ); + }); } } diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index f29f666a7b..f284be3aed 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -27,7 +27,7 @@ use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody use zenith_utils::sock_split::ReadStream; use crate::callmemaybe::CallmeEvent; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; @@ -90,7 +90,7 @@ impl Drop for ReplicationConnGuard { // and current ReplicationConnGuard is tied to the background thread // that receives feedback. struct ReplicationStreamGuard { - tx: Sender, + tx: UnboundedSender, tenant_id: ZTenantId, timelineid: ZTimelineId, } @@ -103,8 +103,10 @@ impl Drop for ReplicationStreamGuard { self.tenant_id, self.timelineid); self.tx - .blocking_send(CallmeEvent::Resume(self.tenant_id, self.timelineid)) - .unwrap(); + .send(CallmeEvent::Resume(self.tenant_id, self.timelineid)) + .unwrap_or_else(|e| { + error!("failed to send Resume request to callmemaybe thread {}", e); + }); } } @@ -251,21 +253,27 @@ impl ReplicationConn { info!("Start replication from {:?} till {:?}", start_pos, stop_pos); // Don't spam pageserver with callmemaybe queries - // when connection is already established. + // when replication connection with pageserver is already established. let _guard = { - let timelineid = swh.timeline.get().timelineid; - let tenant_id = swh.tenantid.unwrap(); - let tx_clone = swh.tx.clone(); - swh.tx - .blocking_send(CallmeEvent::Pause(tenant_id, timelineid)) - .unwrap(); + if swh.appname == Some("wal_proposer_recovery".to_string()) { + None + } else { + let timelineid = swh.timeline.get().timelineid; + let tenant_id = swh.tenantid.unwrap(); + let tx_clone = swh.tx.clone(); + swh.tx + .send(CallmeEvent::Pause(tenant_id, timelineid)) + .unwrap_or_else(|e| { + error!("failed to send Pause request to callmemaybe thread {}", e); + }); - // create a guard to subscribe callback again, when this connection will exit - Some(ReplicationStreamGuard { - tx: tx_clone, - tenant_id, - timelineid, - }) + // create a guard to subscribe callback again, when this connection will exit + Some(ReplicationStreamGuard { + tx: tx_clone, + tenant_id, + timelineid, + }) + } }; // switch to copy diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index b6ec18c1c0..ce90fcf310 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -19,7 +19,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; use crate::timeline::CreateControlFile; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; /// Handler for streaming WAL from acceptor pub struct SendWalHandler { @@ -30,7 +30,7 @@ pub struct SendWalHandler { pub timelineid: Option, pub timeline: Option>, //sender to communicate with callmemaybe thread - pub tx: Sender, + pub tx: UnboundedSender, } impl postgres_backend::Handler for SendWalHandler { @@ -103,7 +103,7 @@ impl postgres_backend::Handler for SendWalHandler { } impl SendWalHandler { - pub fn new(conf: SafeKeeperConf, tx: Sender) -> Self { + pub fn new(conf: SafeKeeperConf, tx: UnboundedSender) -> Self { SendWalHandler { conf, appname: None, diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 95026e3710..a715c0a37f 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -11,14 +11,14 @@ use tracing::*; use crate::callmemaybe::CallmeEvent; use crate::send_wal::SendWalHandler; use crate::SafeKeeperConf; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. pub fn thread_main( conf: SafeKeeperConf, listener: TcpListener, - tx: Sender, + tx: UnboundedSender, ) -> Result<()> { loop { match listener.accept() { @@ -51,7 +51,11 @@ fn get_tid() -> u64 { /// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket(socket: TcpStream, conf: SafeKeeperConf, tx: Sender) -> Result<()> { +fn handle_socket( + socket: TcpStream, + conf: SafeKeeperConf, + tx: UnboundedSender, +) -> Result<()> { let _enter = info_span!("", tid = ?get_tid()).entered(); socket.set_nodelay(true)?;