diff --git a/walkeeper/src/callmemaybe.rs b/walkeeper/src/callmemaybe.rs index f7f8970472..e2fe4cad91 100644 --- a/walkeeper/src/callmemaybe.rs +++ b/walkeeper/src/callmemaybe.rs @@ -25,7 +25,7 @@ async fn request_callback( timelineid: ZTimelineId, tenantid: ZTenantId, ) -> Result<()> { - debug!( + info!( "callmemaybe request_callback Connecting to pageserver {}", &pageserver_connstr ); @@ -75,6 +75,7 @@ pub enum CallmeEvent { Resume(ZTenantId, ZTimelineId), } +#[derive(Debug)] struct SubscriptionState { tenantid: ZTenantId, timelineid: ZTimelineId, @@ -136,7 +137,7 @@ impl SubscriptionState { fn call(&mut self, recall_period: Duration, listen_pg_addr: String) { // Ignore call request if this subscription is paused if self.paused { - debug!( + info!( "ignore call request for paused subscription tenantid: {}, timelineid: {}", self.tenantid, self.timelineid @@ -146,7 +147,7 @@ impl SubscriptionState { // Check if it too early to recall if self.handle.is_some() && self.last_call_time.elapsed() < recall_period { - debug!( + info!( "too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?} tenantid: {}, timelineid: {}", self.last_call_time, recall_period, self.tenantid, self.timelineid @@ -174,7 +175,7 @@ impl SubscriptionState { // Update last_call_time self.last_call_time = Instant::now(); - debug!( + info!( "new call spawned. time {:?} tenantid: {}, timelineid: {}", self.last_call_time, self.tenantid, self.timelineid @@ -201,18 +202,22 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { let mut subscriptions = subscriptions.lock().unwrap(); + if let Some(sub) = subscriptions.get(&(tenantid, timelineid)) + { + info!("callmemaybe. subscription already exists {:?}", sub); + } if let Some(mut sub) = subscriptions.insert((tenantid, timelineid), SubscriptionState::new(tenantid, timelineid, pageserver_connstr)) { sub.call(conf.recall_period, conf.listen_pg_addr.clone()); } - debug!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}", + info!("callmemaybe. thread_main. subscribe callback request for timelineid={} tenantid={}", timelineid, tenantid); }, CallmeEvent::Unsubscribe(tenantid, timelineid) => { let mut subscriptions = subscriptions.lock().unwrap(); subscriptions.remove(&(tenantid, timelineid)); - debug!("callmemaybe. thread_main. unsubscribe callback request for timelineid={} tenantid={}", + info!("callmemaybe. thread_main. unsubscribe callback. request for timelineid={} tenantid={}", timelineid, tenantid); }, CallmeEvent::Pause(tenantid, timelineid) => { @@ -221,7 +226,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { @@ -232,7 +237,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver ReceiveWalConn<'pg> { // create a guard to unsubscribe callback, when this wal_stream will exit Some(SendWalHandlerGuard { - tx: tx_clone, - tenant_id, - timelineid, + _tx: tx_clone, + _tenant_id: tenant_id, + _timelineid: timelineid, timeline: Arc::clone(spg.timeline.get()), }) } @@ -147,22 +147,22 @@ impl<'pg> ReceiveWalConn<'pg> { } struct SendWalHandlerGuard { - tx: UnboundedSender, - tenant_id: ZTenantId, - timelineid: ZTimelineId, + _tx: UnboundedSender, + _tenant_id: ZTenantId, + _timelineid: ZTimelineId, timeline: Arc, } impl Drop for SendWalHandlerGuard { fn drop(&mut self) { self.timeline.stop_streaming(); - self.tx - .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) - .unwrap_or_else(|e| { - error!( - "failed to send Unsubscribe request to callmemaybe thread {}", - e - ); - }); + // self.tx + // .send(CallmeEvent::Unsubscribe(self.tenant_id, self.timelineid)) + // .unwrap_or_else(|e| { + // error!( + // "failed to send Unsubscribe request to callmemaybe thread {}", + // e + // ); + // }); } } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 2c3288e584..0cb619b9d3 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -290,6 +290,13 @@ impl ReplicationConn { } else { // Is is time to end streaming to this replica? if spg.timeline.get().check_stop_streaming(replica_id) { + let timelineid = spg.timeline.get().timelineid; + let tenant_id = spg.ztenantid.unwrap(); + spg.tx + .send(CallmeEvent::Unsubscribe(tenant_id, timelineid)) + .unwrap_or_else(|e| { + error!("failed to send Pause request to callmemaybe thread {}", e); + }); // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); }