diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 57e9fd0f75..52b6110667 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -140,8 +140,13 @@ where request = self.receiver.recv() => { match request { Some(req) => { + if req.reply.is_closed() { + // Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them + continue; + } let res = self.heartbeat(req.servers).await; - req.reply.send(res).unwrap(); + // Ignore the return value in order to not panic if the heartbeat function's future was cancelled + _ = req.reply.send(res); }, None => { return; } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index f47dd72579..fc6d2f3d29 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -815,13 +815,12 @@ impl Service { }; tracing::info!("Sending initial heartbeats..."); - let res_ps = self - .heartbeater_ps - .heartbeat(Arc::new(nodes_to_heartbeat)) - .await; // Put a small, but reasonable timeout to get the initial heartbeats of the safekeepers to avoid a storage controller downtime const SK_TIMEOUT: Duration = Duration::from_secs(5); - let res_sk = tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)).await; + let (res_ps, res_sk) = tokio::join!( + self.heartbeater_ps.heartbeat(Arc::new(nodes_to_heartbeat)), + tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)) + ); let mut online_nodes = HashMap::new(); if let Ok(deltas) = res_ps { @@ -1064,8 +1063,12 @@ impl Service { locked.safekeepers.clone() }; - let res_ps = self.heartbeater_ps.heartbeat(nodes).await; - let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await; + const SK_TIMEOUT: Duration = Duration::from_secs(3); + let (res_ps, res_sk) = tokio::join!( + self.heartbeater_ps.heartbeat(nodes), + tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(safekeepers)) + ); + if let Ok(deltas) = res_ps { let mut to_handle = Vec::default(); @@ -1167,7 +1170,7 @@ impl Service { } } } - if let Ok(deltas) = res_sk { + if let Ok(Ok(deltas)) = res_sk { let mut locked = self.inner.write().unwrap(); let mut safekeepers = (*locked.safekeepers).clone(); for (id, state) in deltas.0 {