diff --git a/walkeeper/src/callmemaybe.rs b/walkeeper/src/callmemaybe.rs index 8187282e24..49712a6bf3 100644 --- a/walkeeper/src/callmemaybe.rs +++ b/walkeeper/src/callmemaybe.rs @@ -64,18 +64,35 @@ pub fn thread_main(conf: SafeKeeperConf, rx: UnboundedReceiver) -> runtime.block_on(main_loop(conf, rx)) } +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct SubscriptionStateKey { + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + pageserver_connstr: String, +} + +impl SubscriptionStateKey { + pub fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId, pageserver_connstr: String) -> Self { + Self { + tenant_id, + timeline_id, + pageserver_connstr, + } + } +} + /// Messages to the callmemaybe thread #[derive(Debug)] pub enum CallmeEvent { // add new subscription to the list - Subscribe(ZTenantId, ZTimelineId, String), + Subscribe(SubscriptionStateKey), // remove the subscription from the list - Unsubscribe(ZTenantId, ZTimelineId, String), + Unsubscribe(SubscriptionStateKey), // don't serve this subscription, but keep it in the list - Pause(ZTenantId, ZTimelineId, String), + Pause(SubscriptionStateKey), // resume this subscription, if it exists, // but don't create a new one if it is gone - Resume(ZTenantId, ZTimelineId, String), + Resume(SubscriptionStateKey), // TODO how do we delete from subscriptions? } @@ -194,7 +211,7 @@ impl Drop for SubscriptionState { } pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver) -> Result<()> { - let subscriptions: Mutex> = + let subscriptions: Mutex> = Mutex::new(HashMap::new()); let mut ticker = tokio::time::interval(conf.recall_period); @@ -204,11 +221,13 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver + CallmeEvent::Subscribe(key) => { - let _enter = info_span!("callmemaybe: subscribe", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered(); + let _enter = info_span!("callmemaybe: subscribe", timelineid = %key.timeline_id, tenantid = %key.tenant_id, pageserver_connstr=%key.pageserver_connstr.clone()).entered(); let mut subscriptions = subscriptions.lock().unwrap(); - match subscriptions.entry((tenantid, timelineid, pageserver_connstr.clone())) { + // XXX this clone is ugly, is there a way to use the trick with Borrow trait with entry API? + // when we switch to node id instead of the connection string key will be Copy and there will be no need to clone + match subscriptions.entry(key.clone()) { Entry::Occupied(_) => { // Do nothing if subscription already exists // If it is paused it means that there is already established replication connection. @@ -220,29 +239,29 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { let subscription = entry.insert(SubscriptionState::new( - tenantid, - timelineid, - pageserver_connstr, + key.tenant_id, + key.timeline_id, + key.pageserver_connstr, )); subscription.call(conf.recall_period, conf.listen_pg_addr.clone()); } } }, - CallmeEvent::Unsubscribe(tenantid, timelineid, pageserver_connstr) => { - let _enter = debug_span!("callmemaybe: unsubscribe", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered(); + CallmeEvent::Unsubscribe(key) => { + let _enter = debug_span!("callmemaybe: unsubscribe", timelineid = %key.timeline_id, tenantid = %key.tenant_id, pageserver_connstr=%key.pageserver_connstr.clone()).entered(); debug!("unsubscribe"); let mut subscriptions = subscriptions.lock().unwrap(); - subscriptions.remove(&(tenantid, timelineid, pageserver_connstr)); + subscriptions.remove(&key); }, - CallmeEvent::Pause(tenantid, timelineid, pageserver_connstr) => { - let _enter = debug_span!("callmemaybe: pause", timelineid = %timelineid, tenantid = %tenantid, pageserver_connstr=%pageserver_connstr.clone()).entered(); + CallmeEvent::Pause(key) => { + let _enter = debug_span!("callmemaybe: pause", timelineid = %key.timeline_id, tenantid = %key.tenant_id, pageserver_connstr=%key.pageserver_connstr.clone()).entered(); let mut subscriptions = subscriptions.lock().unwrap(); // If pause received when no corresponding subscription exists it means that someone started replication // without using callmemaybe. So we create subscription and pause it. // In tenant relocation scenario subscribe call will be executed after pause when compute is restarted. // In that case there is no need to create new/unpause existing subscription. - match subscriptions.entry((tenantid, timelineid, pageserver_connstr.clone())) { + match subscriptions.entry(key.clone()) { Entry::Occupied(mut sub) => { debug!("pause existing"); sub.get_mut().pause(); @@ -250,19 +269,21 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { debug!("create paused"); let subscription = entry.insert(SubscriptionState::new( - tenantid, - timelineid, - pageserver_connstr, + key.tenant_id, + key.timeline_id, + key.pageserver_connstr, )); subscription.pause(); } } }, - CallmeEvent::Resume(tenantid, timelineid, pageserver_connstr) => { - debug!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={} pageserver_connstr={}", - timelineid, tenantid, pageserver_connstr); + CallmeEvent::Resume(key) => { + debug!( + "callmemaybe. thread_main. resume callback request for timelineid={} tenantid={} pageserver_connstr={}", + key.timeline_id, key.tenant_id, key.pageserver_connstr, + ); let mut subscriptions = subscriptions.lock().unwrap(); - if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid, pageserver_connstr)) + if let Some(sub) = subscriptions.get_mut(&key) { sub.resume(); }; @@ -270,9 +291,10 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { + let _enter = debug_span!("callmemaybe: tick").entered(); let mut subscriptions = subscriptions.lock().unwrap(); - for (&(_tenantid, _timelineid, _), state) in subscriptions.iter_mut() { + for (_, state) in subscriptions.iter_mut() { state.call(conf.recall_period, conf.listen_pg_addr.clone()); } }, diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 9d9a7ff3dc..caf8ed5311 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -21,6 +21,7 @@ use zenith_utils::pq_proto::{BeMessage, FeMessage}; use zenith_utils::zid::ZTenantId; use crate::callmemaybe::CallmeEvent; +use crate::callmemaybe::SubscriptionStateKey; pub struct ReceiveWalConn<'pg> { /// Postgres connection @@ -104,13 +105,14 @@ impl<'pg> ReceiveWalConn<'pg> { // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver // we should use callmemaybe mechanism. - let timelineid = spg.timeline.get().timelineid; + let timeline_id = spg.timeline.get().timeline_id; + let subscription_key = SubscriptionStateKey::new( + tenant_id, + timeline_id, + pageserver_connstr.to_owned(), + ); spg.tx - .send(CallmeEvent::Subscribe( - tenant_id, - timelineid, - pageserver_connstr.to_owned(), - )) + .send(CallmeEvent::Subscribe(subscription_key)) .unwrap_or_else(|e| { error!( "failed to send Subscribe request to callmemaybe thread {}", diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 79b6ba9368..de469aaf81 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -26,7 +26,7 @@ use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; use zenith_utils::sock_split::ReadStream; -use crate::callmemaybe::CallmeEvent; +use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use tokio::sync::mpsc::UnboundedSender; use zenith_utils::zid::{ZTenantId, ZTimelineId}; @@ -90,7 +90,7 @@ impl Drop for ReplicationConnGuard { struct ReplicationStreamGuard { tx: UnboundedSender, tenant_id: ZTenantId, - timelineid: ZTimelineId, + timeline_id: ZTimelineId, pageserver_connstr: String, } @@ -110,11 +110,7 @@ impl Drop for ReplicationStreamGuard { ); self.tx - .send(CallmeEvent::Resume( - self.tenant_id, - self.timelineid, - self.pageserver_connstr.to_owned(), - )) + .send(CallmeEvent::Resume(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Resume request to callmemaybe thread {}", e); }); @@ -271,15 +267,13 @@ impl ReplicationConn { None } else { let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); - let timelineid = spg.timeline.get().timelineid; + let timeline_id = spg.timeline.get().timeline_id; let tenant_id = spg.ztenantid.unwrap(); let tx_clone = spg.tx.clone(); + let subscription_key = + SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone()); spg.tx - .send(CallmeEvent::Pause( - tenant_id, - timelineid, - pageserver_connstr.clone(), - )) + .send(CallmeEvent::Pause(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Pause request to callmemaybe thread {}", e); }); @@ -288,7 +282,7 @@ impl ReplicationConn { Some(ReplicationStreamGuard { tx: tx_clone, tenant_id, - timelineid, + timeline_id, pageserver_connstr, }) } @@ -318,15 +312,13 @@ impl ReplicationConn { // this expect should never fail because in wal_proposer_recovery mode stop_pos is set // and this code is not reachable let pageserver_connstr = pageserver_connstr - .expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); + .expect("there should be a pageserver connection string"); let timelineid = spg.timeline.get().timeline_id; let tenant_id = spg.ztenantid.unwrap(); + let subscription_key = + SubscriptionStateKey::new(tenant_id, timelineid, pageserver_connstr); spg.tx - .send(CallmeEvent::Unsubscribe( - tenant_id, - timelineid, - pageserver_connstr, - )) + .send(CallmeEvent::Unsubscribe(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Pause request to callmemaybe thread {}", e); }); @@ -352,7 +344,7 @@ impl ReplicationConn { // Open a new file. let segno = start_pos.segment_number(wal_seg_size); let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let timeline_id = spg.timeline.get().timelineid; + let timeline_id = spg.timeline.get().timeline_id; let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name); Self::open_wal_file(&wal_file_path)? } diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 3e83c0151a..5548cd28f4 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -166,7 +166,7 @@ impl SharedState { /// Database instance (tenant) pub struct Timeline { - pub timelineid: ZTimelineId, + pub timeline_id: ZTimelineId, mutex: Mutex, /// conditional variable used to notify wal senders cond: Condvar, @@ -175,7 +175,7 @@ pub struct Timeline { impl Timeline { fn new(timelineid: ZTimelineId, shared_state: SharedState) -> Timeline { Timeline { - timelineid, + timeline_id: timelineid, mutex: Mutex::new(shared_state), cond: Condvar::new(), }