From 41dce68bddea207fe9bcc9e70b355a7eddfef790 Mon Sep 17 00:00:00 2001 From: anastasia Date: Tue, 16 Nov 2021 15:32:44 +0300 Subject: [PATCH] callmemaybe refactoring - Don't spawn a separate thread for each connection. Instead use one thread per safekeeper, that iterates over all connections and sends callback requests for them. -Use tokio postgres to connect to the pageserver, to avoid spawning a new thread for each connection. callmemaybe review fixes: - Spawn all request_callback tasks separately. - Remember 'last_call_time' and only send request_callback if 'recall_period' has passed. - If task hasn't finished till next recall, abort it and try again. - Add pause/resume CallmeEvents to avoid spamming pageserver when connection already established. --- Cargo.lock | 48 +++--- walkeeper/Cargo.toml | 1 + walkeeper/src/bin/safekeeper.rs | 46 ++++-- walkeeper/src/callmemaybe.rs | 275 ++++++++++++++++++++++++++++++++ walkeeper/src/lib.rs | 1 + walkeeper/src/receive_wal.rs | 108 ++++++------- walkeeper/src/replication.rs | 45 ++++++ walkeeper/src/send_wal.rs | 7 +- walkeeper/src/wal_service.rs | 15 +- 9 files changed, 439 insertions(+), 107 deletions(-) create mode 100644 walkeeper/src/callmemaybe.rs diff --git a/Cargo.lock b/Cargo.lock index 9221d50ddd..0cdc2eaf05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,9 +540,9 @@ checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" [[package]] name = "futures" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" +checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e" dependencies = [ "futures-channel", "futures-core", @@ -555,9 +555,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" +checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" dependencies = [ "futures-core", "futures-sink", @@ -565,15 +565,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" +checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" [[package]] name = "futures-executor" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" +checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97" dependencies = [ "futures-core", "futures-task", @@ -582,18 +582,16 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" +checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" [[package]] name = "futures-macro" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" +checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd" dependencies = [ - "autocfg", - "proc-macro-hack", "proc-macro2", "quote", "syn", @@ -601,23 +599,22 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11" +checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" [[package]] name = "futures-task" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" +checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" [[package]] name = "futures-util" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" +checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" dependencies = [ - "autocfg", "futures-channel", "futures-core", "futures-io", @@ -627,8 +624,6 @@ dependencies = [ "memchr", "pin-project-lite", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -1402,12 +1397,6 @@ version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" -[[package]] -name = "proc-macro-nested" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" - [[package]] name = "proc-macro2" version = "1.0.30" @@ -2361,6 +2350,7 @@ dependencies = [ "signal-hook", "tempfile", "tokio", + "tokio-postgres", "tokio-stream", "tracing", "walkdir", diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 1b622823aa..b9a1f24e64 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -32,6 +32,7 @@ signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } hex = "0.4.3" const_format = "0.2.21" +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } # FIXME: 'pageserver' is needed for ZTimelineId. Refactor pageserver = { path = "../pageserver" } diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 6cc6f7c743..67ee0d34cd 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -8,15 +8,18 @@ use daemonize::Daemonize; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; +use zenith_utils::http::endpoint; +use zenith_utils::{logging, tcp_listener, GIT_VERSION}; + +use tokio::sync::mpsc; +use walkeeper::callmemaybe; use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::SafeKeeperConf; -use zenith_utils::http::endpoint; use zenith_utils::shutdown::exit_now; use zenith_utils::signals; -use zenith_utils::{logging, tcp_listener, GIT_VERSION}; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -181,16 +184,35 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { ); } - threads.push( - thread::Builder::new() - .name("WAL acceptor thread".into()) - .spawn(|| { - let thread_result = wal_service::thread_main(conf, pg_listener); - if let Err(e) = thread_result { - info!("wal_service thread terminated: {}", e); - } - })?, - ); + let (tx, rx) = mpsc::channel(100); + let conf_cloned = conf.clone(); + let wal_acceptor_thread = thread::Builder::new() + .name("WAL acceptor thread".into()) + .spawn(|| { + // thread code + let thread_result = wal_service::thread_main(conf_cloned, pg_listener, tx); + if let Err(e) = thread_result { + info!("wal_service thread terminated: {}", e); + } + }) + .unwrap(); + + threads.push(wal_acceptor_thread); + + let callmemaybe_thread = thread::Builder::new() + .name("callmemaybe thread".into()) + .spawn(|| { + // thread code + let thread_result = callmemaybe::thread_main(conf, rx); + if let Err(e) = thread_result { + error!("callmemaybe thread terminated: {}", e); + } + }) + .unwrap(); + threads.push(callmemaybe_thread); + + // TODO: put more thoughts into handling of failed threads + // We probably should restart them. // NOTE: we still have to handle signals like SIGQUIT to prevent coredumps signals.handle(|signal| { diff --git a/walkeeper/src/callmemaybe.rs b/walkeeper/src/callmemaybe.rs new file mode 100644 index 0000000000..4e249e9cfe --- /dev/null +++ b/walkeeper/src/callmemaybe.rs @@ -0,0 +1,275 @@ +//! +//! Callmemaybe thread is responsible for periodically requesting +//! pageserver to initiate wal streaming. +//! +use crate::SafeKeeperConf; +use anyhow::Result; +use log::*; +use std::collections::HashMap; +use std::net::ToSocketAddrs; +use std::sync::Mutex; +use std::time::{Duration, Instant}; +use tokio::runtime; +use tokio::sync::mpsc::Receiver; +use tokio::task; +use tokio_postgres::NoTls; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + +async fn request_callback( + pageserver_connstr: String, + listen_pg_addr_str: String, + timelineid: ZTimelineId, + tenantid: ZTenantId, +) -> Result<()> { + debug!( + "callmemaybe request_callback Connecting to pageserver {}", + &pageserver_connstr + ); + let (client, connection) = tokio_postgres::connect(&pageserver_connstr, NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + // Send callmemaybe request + let listen_pg_addr = listen_pg_addr_str + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + + let callme = format!( + "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'", + tenantid, + timelineid, + &listen_pg_addr.ip().to_string(), + listen_pg_addr.port(), + timelineid, + tenantid + ); + + let _ = client.simple_query(&callme).await?; + + Ok(()) +} + +pub fn thread_main(conf: SafeKeeperConf, rx: Receiver) -> Result<()> { + let runtime = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + runtime.block_on(main_loop(conf, rx)) +} + +#[derive(Debug)] +pub enum CallmeEvent { + // add new subscription to the list + Subscribe(ZTenantId, ZTimelineId, String), + // remove the subscription from the list + Unsubscribe(ZTenantId, ZTimelineId), + // don't serve this subscription, but keep it in the list + Pause(ZTenantId, ZTimelineId), + // resume this subscription, if it exists, + // but don't create a new one if it is gone + Resume(ZTenantId, ZTimelineId), +} + +struct SubscriptionStateInner { + handle: Option>, + last_call_time: Instant, + paused: bool, +} + +struct SubscriptionState { + tenantid: ZTenantId, + timelineid: ZTimelineId, + pageserver_connstr: String, + inner: Mutex, +} + +impl SubscriptionState { + fn new( + tenantid: ZTenantId, + timelineid: ZTimelineId, + pageserver_connstr: String, + ) -> SubscriptionState { + let state_inner = SubscriptionStateInner { + handle: None, + last_call_time: Instant::now(), + paused: false, + }; + + SubscriptionState { + tenantid, + timelineid, + pageserver_connstr, + inner: Mutex::new(state_inner), + } + } + + fn pause(&self) { + let mut state_inner = self.inner.lock().unwrap(); + state_inner.paused = true; + + if let Some(handle) = state_inner.handle.take() { + handle.abort(); + } + } + + fn resume(&self) { + let mut state_inner = self.inner.lock().unwrap(); + state_inner.paused = false; + } + + fn call(&self, recall_period: Duration, listen_pg_addr: String) { + let mut state_inner = self.inner.lock().unwrap(); + + // Ignore call request if this subscription is paused + if state_inner.paused { + debug!( + "ignore call request for paused subscription + tenantid: {}, timelineid: {}", + self.tenantid, self.timelineid + ); + return; + } + + // Check if it too early to recall + if state_inner.handle.is_some() && state_inner.last_call_time.elapsed() < recall_period { + debug!( + "too early to recall. state_inner.last_call_time.elapsed: {:?}, recall_period: {:?} + tenantid: {}, timelineid: {}", + state_inner.last_call_time, recall_period, self.tenantid, self.timelineid + ); + return; + } + + // If previous task didn't complete in recall_period, it must be hanging, + // so don't wait for it forever, just abort it and try again. + // + // Most likely, the task have already successfully completed + // and abort() won't have any effect. + if let Some(handle) = state_inner.handle.take() { + handle.abort(); + + let timelineid = self.timelineid; + let tenantid = self.tenantid; + tokio::spawn(async move { + if let Err(err) = handle.await { + if err.is_cancelled() { + warn!("callback task for timelineid={} tenantid={} was cancelled before spawning a new one", + timelineid, tenantid); + } + } + }); + } + + let timelineid = self.timelineid; + let tenantid = self.tenantid; + let pageserver_connstr = self.pageserver_connstr.clone(); + state_inner.handle = Some(tokio::spawn(async move { + request_callback(pageserver_connstr, listen_pg_addr, timelineid, tenantid) + .await + .unwrap_or_else(|e| { + error!( + "callmemaybe. request_callback for timelineid={} tenantid={} failed: {}", + timelineid, tenantid, e + ); + }) + })); + + // Update last_call_time + state_inner.last_call_time = Instant::now(); + debug!( + "new call spawned. time {:?} + tenantid: {}, timelineid: {}", + state_inner.last_call_time, self.tenantid, self.timelineid + ); + } +} + +impl Drop for SubscriptionStateInner { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.abort(); + } + } +} + +pub async fn main_loop(conf: SafeKeeperConf, mut rx: Receiver) -> Result<()> { + let default_timeout = Duration::from_secs(5); + let recall_period = conf.recall_period.unwrap_or(default_timeout); + + let subscriptions: Mutex> = + Mutex::new(HashMap::new()); + + loop { + let call_iteration = tokio::select! { + request = rx.recv() => { + match request { + Some(request) => + { + match request + { + CallmeEvent::Subscribe(tenantid, timelineid, pageserver_connstr) => + { + let mut subscriptions = subscriptions.lock().unwrap(); + subscriptions.insert((tenantid, timelineid), SubscriptionState::new(tenantid, timelineid, pageserver_connstr)) ; + debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}", + timelineid, tenantid); + true + }, + CallmeEvent::Unsubscribe(tenantid, timelineid) => { + let mut subscriptions = subscriptions.lock().unwrap(); + subscriptions.remove(&(tenantid, timelineid)); + debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}", + timelineid, tenantid); + false + }, + CallmeEvent::Pause(tenantid, timelineid) => { + let subscriptions = subscriptions.lock().unwrap(); + if let Some(sub) = subscriptions.get(&(tenantid, timelineid)) + { + sub.pause(); + } + debug!("callmemaybe. thread_main. pause callback request for timelineid={} tenantid={}", + timelineid, tenantid); + false + }, + CallmeEvent::Resume(tenantid, timelineid) => { + let mut call_iteration = false; + let subscriptions = subscriptions.lock().unwrap(); + if let Some(sub) = subscriptions.get(&(tenantid, timelineid)) + { + sub.resume(); + debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}", + timelineid, tenantid); + call_iteration = true; + } + call_iteration + }, + } + }, + // all senders disconnected + None => { + return Ok(()); + }, + } + }, + _ = tokio::time::sleep(recall_period) => { true }, + }; + + if call_iteration { + let subscriptions = subscriptions.lock().unwrap(); + + for (&(_tenantid, _timelineid), state) in subscriptions.iter() { + let listen_pg_addr = conf.listen_pg_addr.clone(); + + state.call(recall_period, listen_pg_addr); + } + } + } +} diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index a0060c3630..2c26ea5dd6 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -4,6 +4,7 @@ use std::time::Duration; use zenith_utils::zid::ZTimelineId; +pub mod callmemaybe; pub mod http; pub mod json_ctrl; pub mod receive_wal; diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index ced057bee2..a85fe114c6 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -5,24 +5,27 @@ use anyhow::{bail, Context, Result}; use bytes::Bytes; use bytes::BytesMut; +<<<<<<< HEAD use postgres::{Client, Config, NoTls}; use tracing::*; +======= +use log::*; +>>>>>>> callmemaybe refactoring use std::net::SocketAddr; -use std::thread; -use std::thread::sleep; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; use crate::send_wal::SendWalHandler; use crate::timeline::TimelineTools; -use crate::SafeKeeperConf; -use zenith_utils::connstring::connection_host_port; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeMessage}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use crate::callmemaybe::CallmeEvent; +use tokio::sync::mpsc::Sender; + pub struct ReceiveWalConn<'pg> { /// Postgres connection pg_backend: &'pg mut PostgresBackend, @@ -34,50 +37,6 @@ pub struct ReceiveWalConn<'pg> { pageserver_connstr: Option, } -/// -/// Periodically request pageserver to call back. -/// If pageserver already has replication channel, it will just ignore this request -/// -fn request_callback( - conf: SafeKeeperConf, - pageserver_connstr: String, - timelineid: ZTimelineId, - tenantid: ZTenantId, -) { - // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses - let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr); - let me_conf: Config = me_connstr.parse().unwrap(); - let (host, port) = connection_host_port(&me_conf); - let callme = format!( - "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'", - tenantid, timelineid, host, port, timelineid, tenantid, - ); - - loop { - info!( - "requesting page server to connect to us: start {} {}", - pageserver_connstr, callme - ); - match Client::connect(&pageserver_connstr, NoTls) { - Ok(mut client) => { - if let Err(e) = client.simple_query(&callme) { - error!("Failed to send callme request to pageserver: {}", e); - } - } - Err(e) => error!( - "Failed to connect to pageserver {}: {}", - &pageserver_connstr, e - ), - } - - if let Some(period) = conf.recall_period { - sleep(period); - } else { - break; - } - } -} - impl<'pg> ReceiveWalConn<'pg> { pub fn new( pg: &'pg mut PostgresBackend, @@ -138,20 +97,33 @@ impl<'pg> ReceiveWalConn<'pg> { _ => bail!("unexpected message {:?} instead of greeting", msg), } - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - if let Some(ref pageserver_connstr) = self.pageserver_connstr { - let conf = swh.conf.clone(); - let timelineid = swh.timeline.get().timelineid; - // copy to safely move to a thread - let pageserver_connstr = pageserver_connstr.to_owned(); - let _ = thread::Builder::new() - .name("request_callback thread".into()) - .spawn(move || { - request_callback(conf, pageserver_connstr, timelineid, tenant_id); + // if requested, ask pageserver to fetch wal from us + // as long as this wal_stream is alive, callmemaybe thread + // will send requests to pageserver + let _guard = match self.pageserver_connstr { + Some(ref pageserver_connstr) => { + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + let timelineid = swh.timeline.get().timelineid; + let tx_clone = swh.tx.clone(); + let pageserver_connstr = pageserver_connstr.to_owned(); + swh.tx + .blocking_send(CallmeEvent::Subscribe( + tenant_id, + timelineid, + pageserver_connstr, + )) + .unwrap(); + + // create a guard to unsubscribe callback, when this wal_stream will exit + Some(SendWalHandlerGuard { + tx: tx_clone, + tenant_id, + timelineid, }) - .unwrap(); - } + } + None => None, + }; loop { let reply = swh @@ -166,3 +138,17 @@ impl<'pg> ReceiveWalConn<'pg> { } } } + +struct SendWalHandlerGuard { + tx: Sender, + tenant_id: ZTenantId, + timelineid: ZTimelineId, +} + +impl Drop for SendWalHandlerGuard { + fn drop(&mut self) { + self.tx + .blocking_send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) + .unwrap(); + } +} diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index a9ffc2d1ad..f29f666a7b 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -26,6 +26,10 @@ use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; use zenith_utils::sock_split::ReadStream; +use crate::callmemaybe::CallmeEvent; +use tokio::sync::mpsc::Sender; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; // See: https://www.postgresql.org/docs/13/protocol-replication.html @@ -81,6 +85,29 @@ impl Drop for ReplicationConnGuard { } } +// XXX: Naming is a bit messy here. +// This ReplicationStreamGuard lives as long as ReplicationConn +// and current ReplicationConnGuard is tied to the background thread +// that receives feedback. +struct ReplicationStreamGuard { + tx: Sender, + tenant_id: ZTenantId, + timelineid: ZTimelineId, +} + +impl Drop for ReplicationStreamGuard { + fn drop(&mut self) { + // the connection with pageserver is lost, + // resume callback subscription + info!("Connection to pageserver is gone. Subscribe to callmemeybe again. tenantid {} timelineid {}", + self.tenant_id, self.timelineid); + + self.tx + .blocking_send(CallmeEvent::Resume(self.tenant_id, self.timelineid)) + .unwrap(); + } +} + impl ReplicationConn { /// Create a new `ReplicationConn` pub fn new(pgb: &mut PostgresBackend) -> Self { @@ -223,6 +250,24 @@ impl ReplicationConn { }; info!("Start replication from {:?} till {:?}", start_pos, stop_pos); + // Don't spam pageserver with callmemaybe queries + // when connection is already established. + let _guard = { + let timelineid = swh.timeline.get().timelineid; + let tenant_id = swh.tenantid.unwrap(); + let tx_clone = swh.tx.clone(); + swh.tx + .blocking_send(CallmeEvent::Pause(tenant_id, timelineid)) + .unwrap(); + + // create a guard to subscribe callback again, when this connection will exit + Some(ReplicationStreamGuard { + tx: tx_clone, + tenant_id, + timelineid, + }) + }; + // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a86d8d9305..b6ec18c1c0 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -17,7 +17,9 @@ use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use crate::callmemaybe::CallmeEvent; use crate::timeline::CreateControlFile; +use tokio::sync::mpsc::Sender; /// Handler for streaming WAL from acceptor pub struct SendWalHandler { @@ -27,6 +29,8 @@ pub struct SendWalHandler { pub tenantid: Option, pub timelineid: Option, pub timeline: Option>, + //sender to communicate with callmemaybe thread + pub tx: Sender, } impl postgres_backend::Handler for SendWalHandler { @@ -99,13 +103,14 @@ impl postgres_backend::Handler for SendWalHandler { } impl SendWalHandler { - pub fn new(conf: SafeKeeperConf) -> Self { + pub fn new(conf: SafeKeeperConf, tx: Sender) -> Self { SendWalHandler { conf, appname: None, tenantid: None, timelineid: None, timeline: None, + tx, } } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index e47bf2eb04..95026e3710 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -8,22 +8,29 @@ use std::net::{TcpListener, TcpStream}; use std::thread; use tracing::*; +use crate::callmemaybe::CallmeEvent; use crate::send_wal::SendWalHandler; use crate::SafeKeeperConf; +use tokio::sync::mpsc::Sender; use zenith_utils::postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. -pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> { +pub fn thread_main( + conf: SafeKeeperConf, + listener: TcpListener, + tx: Sender, +) -> Result<()> { loop { match listener.accept() { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); let conf = conf.clone(); + let tx_clone = tx.clone(); let _ = thread::Builder::new() .name("WAL service thread".into()) .spawn(move || { - if let Err(err) = handle_socket(socket, conf) { + if let Err(err) = handle_socket(socket, conf, tx_clone) { error!("connection handler exited: {}", err); } }) @@ -44,12 +51,12 @@ fn get_tid() -> u64 { /// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> { +fn handle_socket(socket: TcpStream, conf: SafeKeeperConf, tx: Sender) -> Result<()> { let _enter = info_span!("", tid = ?get_tid()).entered(); socket.set_nodelay(true)?; - let mut conn_handler = SendWalHandler::new(conf); + let mut conn_handler = SendWalHandler::new(conf, tx); let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?; // libpq replication protocol between safekeeper and replicas/pagers pgbackend.run(&mut conn_handler)?;