storcon: fix heartbeats timing out causing a panic (#10902)

Fix an issue caused by PR
https://github.com/neondatabase/neon/pull/10891: we introduced the
concept of timeouts for heartbeats, where we would hang up on the other
side of the oneshot channel if a timeout happened (future gets
cancelled, receiver is dropped).

This hang up would make the heartbeat task panic when it did obtain the
response, as we unwrap the result of the result sending operation. The
panic would lead to the heartbeat task panicing itself, which is then
according to logs the last sign of life we of that process invocation.
I'm not sure what brings down the process, in theory tokio [should
continue](https://docs.rs/tokio/latest/tokio/runtime/enum.UnhandledPanic.html#variant.Ignore),
but idk.

Alternative to #10901.
This commit is contained in:
Arpad Müller
2025-02-20 00:04:05 +01:00
committed by GitHub
parent 787b98f8f2
commit bb7e244a42
2 changed files with 17 additions and 9 deletions

View File

@@ -140,8 +140,13 @@ where
request = self.receiver.recv() => {
match request {
Some(req) => {
if req.reply.is_closed() {
// Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
continue;
}
let res = self.heartbeat(req.servers).await;
req.reply.send(res).unwrap();
// Ignore the return value in order to not panic if the heartbeat function's future was cancelled
_ = req.reply.send(res);
},
None => { return; }
}

View File

@@ -815,13 +815,12 @@ impl Service {
};
tracing::info!("Sending initial heartbeats...");
let res_ps = self
.heartbeater_ps
.heartbeat(Arc::new(nodes_to_heartbeat))
.await;
// Put a small, but reasonable timeout to get the initial heartbeats of the safekeepers to avoid a storage controller downtime
const SK_TIMEOUT: Duration = Duration::from_secs(5);
let res_sk = tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)).await;
let (res_ps, res_sk) = tokio::join!(
self.heartbeater_ps.heartbeat(Arc::new(nodes_to_heartbeat)),
tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks))
);
let mut online_nodes = HashMap::new();
if let Ok(deltas) = res_ps {
@@ -1064,8 +1063,12 @@ impl Service {
locked.safekeepers.clone()
};
let res_ps = self.heartbeater_ps.heartbeat(nodes).await;
let res_sk = self.heartbeater_sk.heartbeat(safekeepers).await;
const SK_TIMEOUT: Duration = Duration::from_secs(3);
let (res_ps, res_sk) = tokio::join!(
self.heartbeater_ps.heartbeat(nodes),
tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(safekeepers))
);
if let Ok(deltas) = res_ps {
let mut to_handle = Vec::default();
@@ -1167,7 +1170,7 @@ impl Service {
}
}
}
if let Ok(deltas) = res_sk {
if let Ok(Ok(deltas)) = res_sk {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
for (id, state) in deltas.0 {