From 25a515b968d00ace41e39140f038904ffda8995e Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 3 Jan 2022 18:24:19 +0300 Subject: [PATCH] Don't call immediately on resume in callmemaybe. It creates busy loop if pageserver <-> safekeeper connection fails after it was established (e.g. currently due to 'segment checkpoint not found' error on pageserver). Also wake up callmemaybe thread regularly once in recall_period regardless of channel activity. --- walkeeper/src/callmemaybe.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/walkeeper/src/callmemaybe.rs b/walkeeper/src/callmemaybe.rs index e2fe4cad91..41c82b3bec 100644 --- a/walkeeper/src/callmemaybe.rs +++ b/walkeeper/src/callmemaybe.rs @@ -137,7 +137,7 @@ impl SubscriptionState { fn call(&mut self, recall_period: Duration, listen_pg_addr: String) { // Ignore call request if this subscription is paused if self.paused { - info!( + debug!( "ignore call request for paused subscription tenantid: {}, timelineid: {}", self.tenantid, self.timelineid @@ -147,7 +147,7 @@ impl SubscriptionState { // Check if it too early to recall if self.handle.is_some() && self.last_call_time.elapsed() < recall_period { - info!( + debug!( "too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?} tenantid: {}, timelineid: {}", self.last_call_time, recall_period, self.tenantid, self.timelineid @@ -193,6 +193,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver> = Mutex::new(HashMap::new()); + let mut ticker = tokio::time::interval(conf.recall_period); loop { tokio::select! { request = rx.recv() => @@ -234,7 +235,6 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver { + _ = ticker.tick() => { let mut subscriptions = subscriptions.lock().unwrap(); for (&(_tenantid, _timelineid), state) in subscriptions.iter_mut() {