Don't call immediately on resume in callmemaybe.

It creates busy loop if pageserver <-> safekeeper connection fails after it was
established (e.g. currently due to 'segment checkpoint not found' error on
pageserver).

Also wake up callmemaybe thread regularly once in recall_period regardless of
channel activity.
This commit is contained in:
Arseny Sher
2022-01-03 18:24:19 +03:00
parent 1c47fbae81
commit 25a515b968

View File

@@ -137,7 +137,7 @@ impl SubscriptionState {
fn call(&mut self, recall_period: Duration, listen_pg_addr: String) {
// Ignore call request if this subscription is paused
if self.paused {
info!(
debug!(
"ignore call request for paused subscription
tenantid: {}, timelineid: {}",
self.tenantid, self.timelineid
@@ -147,7 +147,7 @@ impl SubscriptionState {
// Check if it too early to recall
if self.handle.is_some() && self.last_call_time.elapsed() < recall_period {
info!(
debug!(
"too early to recall. self.last_call_time.elapsed: {:?}, recall_period: {:?}
tenantid: {}, timelineid: {}",
self.last_call_time, recall_period, self.tenantid, self.timelineid
@@ -193,6 +193,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
let subscriptions: Mutex<HashMap<(ZTenantId, ZTimelineId), SubscriptionState>> =
Mutex::new(HashMap::new());
let mut ticker = tokio::time::interval(conf.recall_period);
loop {
tokio::select! {
request = rx.recv() =>
@@ -234,7 +235,6 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
if let Some(sub) = subscriptions.get_mut(&(tenantid, timelineid))
{
sub.resume();
sub.call(conf.recall_period, conf.listen_pg_addr.clone());
};
info!("callmemaybe. thread_main. resume callback request for timelineid={} tenantid={}",
@@ -242,7 +242,7 @@ pub async fn main_loop(conf: SafeKeeperConf, mut rx: UnboundedReceiver<CallmeEve
},
}
},
_ = tokio::time::sleep(conf.recall_period) => {
_ = ticker.tick() => {
let mut subscriptions = subscriptions.lock().unwrap();
for (&(_tenantid, _timelineid), state) in subscriptions.iter_mut() {