From bb7e244a429742283ceff9b53f0ffab98a8d5ba3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 20 Feb 2025 00:04:05 +0100 Subject: [PATCH] storcon: fix heartbeats timing out causing a panic (#10902) Fix an issue caused by PR https://github.com/neondatabase/neon/pull/10891: we introduced the concept of timeouts for heartbeats, where we would hang up on the other side of the oneshot channel if a timeout happened (future gets cancelled, receiver is dropped). This hang up would make the heartbeat task panic when it did obtain the response, as we unwrap the result of the result sending operation. The panic would lead to the heartbeat task panicing itself, which is then according to logs the last sign of life we of that process invocation. I'm not sure what brings down the process, in theory tokio [should continue](https://docs.rs/tokio/latest/tokio/runtime/enum.UnhandledPanic.html#variant.Ignore), but idk. Alternative to #10901. --- storage_controller/src/heartbeater.rs | 7 ++++++- storage_controller/src/service.rs | 19 +++++++++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 57e9fd0f75..52b6110667 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -140,8 +140,13 @@ where request = self.receiver.recv() => { match request { Some(req) => { + if req.reply.is_closed() { + // Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them + continue; + } let res = self.heartbeat(req.servers).await; - req.reply.send(res).unwrap(); + // Ignore the return value in order to not panic if the heartbeat function's future was cancelled + _ = req.reply.send(res); }, None => { return; } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index f47dd72579..fc6d2f3d29 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -815,13 +815,12 @@ impl Service { }; tracing::info!("Sending initial heartbeats..."); - let res_ps = self - .heartbeater_ps - .heartbeat(Arc::new(nodes_to_heartbeat)) - .await; // Put a small, but reasonable timeout to get the initial heartbeats of the safekeepers to avoid a storage controller downtime const SK_TIMEOUT: Duration = Duration::from_secs(5); - let res_sk = tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)).await; + let (res_ps, res_sk) = tokio::join!( + self.heartbeater_ps.heartbeat(Arc::new(nodes_to_heartbeat)), + tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)) + ); let mut online_nodes = HashMap::new(); if let Ok(deltas) = res_ps { @@ -1064,8 +1063,12 @@ impl Service { locked.safekeepers.clone() }; - let res_ps = self.heartbeater_ps.heartbeat(nodes).await; - let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await; + const SK_TIMEOUT: Duration = Duration::from_secs(3); + let (res_ps, res_sk) = tokio::join!( + self.heartbeater_ps.heartbeat(nodes), + tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(safekeepers)) + ); + if let Ok(deltas) = res_ps { let mut to_handle = Vec::default(); @@ -1167,7 +1170,7 @@ impl Service { } } } - if let Ok(deltas) = res_sk { + if let Ok(Ok(deltas)) = res_sk { let mut locked = self.inner.write().unwrap(); let mut safekeepers = (*locked.safekeepers).clone(); for (id, state) in deltas.0 {