mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
Fix an issue caused by PR https://github.com/neondatabase/neon/pull/10891: we introduced the concept of timeouts for heartbeats, where we would hang up on the other side of the oneshot channel if a timeout happened (future gets cancelled, receiver is dropped). This hang up would make the heartbeat task panic when it did obtain the response, as we unwrap the result of the result sending operation. The panic would lead to the heartbeat task panicing itself, which is then according to logs the last sign of life we of that process invocation. I'm not sure what brings down the process, in theory tokio [should continue](https://docs.rs/tokio/latest/tokio/runtime/enum.UnhandledPanic.html#variant.Ignore), but idk. Alternative to #10901.
439 lines
15 KiB
Rust
439 lines
15 KiB
Rust
use futures::{stream::FuturesUnordered, StreamExt};
|
|
use safekeeper_api::models::SafekeeperUtilization;
|
|
use safekeeper_client::mgmt_api;
|
|
use std::{
|
|
collections::HashMap,
|
|
fmt::Debug,
|
|
future::Future,
|
|
sync::Arc,
|
|
time::{Duration, Instant},
|
|
};
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
use pageserver_api::{
|
|
controller_api::{NodeAvailability, SkSchedulingPolicy},
|
|
models::PageserverUtilization,
|
|
};
|
|
|
|
use thiserror::Error;
|
|
use utils::{id::NodeId, logging::SecretString};
|
|
|
|
use crate::{node::Node, safekeeper::Safekeeper};
|
|
|
|
struct HeartbeaterTask<Server, State> {
|
|
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
|
cancel: CancellationToken,
|
|
|
|
state: HashMap<NodeId, State>,
|
|
|
|
max_offline_interval: Duration,
|
|
max_warming_up_interval: Duration,
|
|
jwt_token: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub(crate) enum PageserverState {
|
|
Available {
|
|
last_seen_at: Instant,
|
|
utilization: PageserverUtilization,
|
|
},
|
|
WarmingUp {
|
|
started_at: Instant,
|
|
},
|
|
Offline,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub(crate) enum SafekeeperState {
|
|
Available {
|
|
last_seen_at: Instant,
|
|
utilization: SafekeeperUtilization,
|
|
},
|
|
Offline,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) struct AvailablityDeltas<State>(pub Vec<(NodeId, State)>);
|
|
|
|
#[derive(Debug, Error)]
|
|
pub(crate) enum HeartbeaterError {
|
|
#[error("Cancelled")]
|
|
Cancel,
|
|
}
|
|
|
|
struct HeartbeatRequest<Server, State> {
|
|
servers: Arc<HashMap<NodeId, Server>>,
|
|
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas<State>, HeartbeaterError>>,
|
|
}
|
|
|
|
pub(crate) struct Heartbeater<Server, State> {
|
|
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest<Server, State>>,
|
|
}
|
|
|
|
#[allow(private_bounds)]
|
|
impl<Server: Send + Sync + 'static, State: Debug + Send + 'static> Heartbeater<Server, State>
|
|
where
|
|
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
|
{
|
|
pub(crate) fn new(
|
|
jwt_token: Option<String>,
|
|
max_offline_interval: Duration,
|
|
max_warming_up_interval: Duration,
|
|
cancel: CancellationToken,
|
|
) -> Self {
|
|
let (sender, receiver) =
|
|
tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest<Server, State>>();
|
|
let mut heartbeater = HeartbeaterTask::new(
|
|
receiver,
|
|
jwt_token,
|
|
max_offline_interval,
|
|
max_warming_up_interval,
|
|
cancel,
|
|
);
|
|
tokio::task::spawn(async move { heartbeater.run().await });
|
|
|
|
Self { sender }
|
|
}
|
|
|
|
pub(crate) async fn heartbeat(
|
|
&self,
|
|
servers: Arc<HashMap<NodeId, Server>>,
|
|
) -> Result<AvailablityDeltas<State>, HeartbeaterError> {
|
|
let (sender, receiver) = tokio::sync::oneshot::channel();
|
|
self.sender
|
|
.send(HeartbeatRequest {
|
|
servers,
|
|
reply: sender,
|
|
})
|
|
.map_err(|_| HeartbeaterError::Cancel)?;
|
|
|
|
receiver
|
|
.await
|
|
.map_err(|_| HeartbeaterError::Cancel)
|
|
.and_then(|x| x)
|
|
}
|
|
}
|
|
|
|
impl<Server, State: Debug> HeartbeaterTask<Server, State>
|
|
where
|
|
HeartbeaterTask<Server, State>: HeartBeat<Server, State>,
|
|
{
|
|
fn new(
|
|
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
|
|
jwt_token: Option<String>,
|
|
max_offline_interval: Duration,
|
|
max_warming_up_interval: Duration,
|
|
cancel: CancellationToken,
|
|
) -> Self {
|
|
Self {
|
|
receiver,
|
|
cancel,
|
|
state: HashMap::new(),
|
|
max_offline_interval,
|
|
max_warming_up_interval,
|
|
jwt_token,
|
|
}
|
|
}
|
|
async fn run(&mut self) {
|
|
loop {
|
|
tokio::select! {
|
|
request = self.receiver.recv() => {
|
|
match request {
|
|
Some(req) => {
|
|
if req.reply.is_closed() {
|
|
// Prevent a possibly infinite buildup of the receiver channel, if requests arrive faster than we can handle them
|
|
continue;
|
|
}
|
|
let res = self.heartbeat(req.servers).await;
|
|
// Ignore the return value in order to not panic if the heartbeat function's future was cancelled
|
|
_ = req.reply.send(res);
|
|
},
|
|
None => { return; }
|
|
}
|
|
},
|
|
_ = self.cancel.cancelled() => return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) trait HeartBeat<Server, State> {
|
|
fn heartbeat(
|
|
&mut self,
|
|
pageservers: Arc<HashMap<NodeId, Server>>,
|
|
) -> impl Future<Output = Result<AvailablityDeltas<State>, HeartbeaterError>> + Send;
|
|
}
|
|
|
|
impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState> {
|
|
async fn heartbeat(
|
|
&mut self,
|
|
pageservers: Arc<HashMap<NodeId, Node>>,
|
|
) -> Result<AvailablityDeltas<PageserverState>, HeartbeaterError> {
|
|
let mut new_state = HashMap::new();
|
|
|
|
let mut heartbeat_futs = FuturesUnordered::new();
|
|
for (node_id, node) in &*pageservers {
|
|
heartbeat_futs.push({
|
|
let jwt_token = self.jwt_token.clone();
|
|
let cancel = self.cancel.clone();
|
|
|
|
// Clone the node and mark it as available such that the request
|
|
// goes through to the pageserver even when the node is marked offline.
|
|
// This doesn't impact the availability observed by [`crate::service::Service`].
|
|
let mut node_clone = node.clone();
|
|
node_clone
|
|
.set_availability(NodeAvailability::Active(PageserverUtilization::full()));
|
|
|
|
async move {
|
|
let response = node_clone
|
|
.with_client_retries(
|
|
|client| async move { client.get_utilization().await },
|
|
&jwt_token,
|
|
3,
|
|
3,
|
|
Duration::from_secs(1),
|
|
&cancel,
|
|
)
|
|
.await;
|
|
|
|
let response = match response {
|
|
Some(r) => r,
|
|
None => {
|
|
// This indicates cancellation of the request.
|
|
// We ignore the node in this case.
|
|
return None;
|
|
}
|
|
};
|
|
|
|
let status = if let Ok(utilization) = response {
|
|
PageserverState::Available {
|
|
last_seen_at: Instant::now(),
|
|
utilization,
|
|
}
|
|
} else if let NodeAvailability::WarmingUp(last_seen_at) =
|
|
node.get_availability()
|
|
{
|
|
PageserverState::WarmingUp {
|
|
started_at: *last_seen_at,
|
|
}
|
|
} else {
|
|
PageserverState::Offline
|
|
};
|
|
|
|
Some((*node_id, status))
|
|
}
|
|
});
|
|
|
|
loop {
|
|
let maybe_status = tokio::select! {
|
|
next = heartbeat_futs.next() => {
|
|
match next {
|
|
Some(result) => result,
|
|
None => { break; }
|
|
}
|
|
},
|
|
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
|
|
};
|
|
|
|
if let Some((node_id, status)) = maybe_status {
|
|
new_state.insert(node_id, status);
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut warming_up = 0;
|
|
let mut offline = 0;
|
|
for state in new_state.values() {
|
|
match state {
|
|
PageserverState::WarmingUp { .. } => {
|
|
warming_up += 1;
|
|
}
|
|
PageserverState::Offline { .. } => offline += 1,
|
|
PageserverState::Available { .. } => {}
|
|
}
|
|
}
|
|
|
|
tracing::info!(
|
|
"Heartbeat round complete for {} nodes, {} warming-up, {} offline",
|
|
new_state.len(),
|
|
warming_up,
|
|
offline
|
|
);
|
|
|
|
let mut deltas = Vec::new();
|
|
let now = Instant::now();
|
|
for (node_id, ps_state) in new_state.iter_mut() {
|
|
use std::collections::hash_map::Entry::*;
|
|
let entry = self.state.entry(*node_id);
|
|
|
|
let mut needs_update = false;
|
|
match entry {
|
|
Occupied(ref occ) => match (occ.get(), &ps_state) {
|
|
(PageserverState::Offline, PageserverState::Offline) => {}
|
|
(PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
|
|
if now - *last_seen_at >= self.max_offline_interval {
|
|
deltas.push((*node_id, ps_state.clone()));
|
|
needs_update = true;
|
|
}
|
|
}
|
|
(_, PageserverState::WarmingUp { started_at }) => {
|
|
if now - *started_at >= self.max_warming_up_interval {
|
|
*ps_state = PageserverState::Offline;
|
|
}
|
|
|
|
deltas.push((*node_id, ps_state.clone()));
|
|
needs_update = true;
|
|
}
|
|
_ => {
|
|
deltas.push((*node_id, ps_state.clone()));
|
|
needs_update = true;
|
|
}
|
|
},
|
|
Vacant(_) => {
|
|
// This is a new node. Don't generate a delta for it.
|
|
deltas.push((*node_id, ps_state.clone()));
|
|
}
|
|
}
|
|
|
|
match entry {
|
|
Occupied(mut occ) if needs_update => {
|
|
(*occ.get_mut()) = ps_state.clone();
|
|
}
|
|
Vacant(vac) => {
|
|
vac.insert(ps_state.clone());
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
Ok(AvailablityDeltas(deltas))
|
|
}
|
|
}
|
|
|
|
impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, SafekeeperState> {
|
|
async fn heartbeat(
|
|
&mut self,
|
|
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
|
|
) -> Result<AvailablityDeltas<SafekeeperState>, HeartbeaterError> {
|
|
let mut new_state = HashMap::new();
|
|
|
|
let mut heartbeat_futs = FuturesUnordered::new();
|
|
for (node_id, sk) in &*safekeepers {
|
|
if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
|
|
continue;
|
|
}
|
|
heartbeat_futs.push({
|
|
let jwt_token = self
|
|
.jwt_token
|
|
.as_ref()
|
|
.map(|t| SecretString::from(t.to_owned()));
|
|
let cancel = self.cancel.clone();
|
|
|
|
async move {
|
|
let response = sk
|
|
.with_client_retries(
|
|
|client| async move { client.get_utilization().await },
|
|
&jwt_token,
|
|
3,
|
|
3,
|
|
Duration::from_secs(1),
|
|
&cancel,
|
|
)
|
|
.await;
|
|
|
|
let status = match response {
|
|
Ok(utilization) => SafekeeperState::Available {
|
|
last_seen_at: Instant::now(),
|
|
utilization,
|
|
},
|
|
Err(mgmt_api::Error::Cancelled) => {
|
|
// This indicates cancellation of the request.
|
|
// We ignore the node in this case.
|
|
return None;
|
|
}
|
|
Err(e) => {
|
|
tracing::info!(
|
|
"Marking safekeeper {} at as offline: {e}",
|
|
sk.base_url()
|
|
);
|
|
SafekeeperState::Offline
|
|
}
|
|
};
|
|
|
|
Some((*node_id, status))
|
|
}
|
|
});
|
|
|
|
loop {
|
|
let maybe_status = tokio::select! {
|
|
next = heartbeat_futs.next() => {
|
|
match next {
|
|
Some(result) => result,
|
|
None => { break; }
|
|
}
|
|
},
|
|
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
|
|
};
|
|
|
|
if let Some((node_id, status)) = maybe_status {
|
|
new_state.insert(node_id, status);
|
|
}
|
|
}
|
|
}
|
|
|
|
let mut offline = 0;
|
|
for state in new_state.values() {
|
|
match state {
|
|
SafekeeperState::Offline { .. } => offline += 1,
|
|
SafekeeperState::Available { .. } => {}
|
|
}
|
|
}
|
|
|
|
tracing::info!(
|
|
"Heartbeat round complete for {} safekeepers, {} offline",
|
|
new_state.len(),
|
|
offline
|
|
);
|
|
|
|
let mut deltas = Vec::new();
|
|
let now = Instant::now();
|
|
for (node_id, sk_state) in new_state.iter_mut() {
|
|
use std::collections::hash_map::Entry::*;
|
|
let entry = self.state.entry(*node_id);
|
|
|
|
let mut needs_update = false;
|
|
match entry {
|
|
Occupied(ref occ) => match (occ.get(), &sk_state) {
|
|
(SafekeeperState::Offline, SafekeeperState::Offline) => {}
|
|
(SafekeeperState::Available { last_seen_at, .. }, SafekeeperState::Offline) => {
|
|
if now - *last_seen_at >= self.max_offline_interval {
|
|
deltas.push((*node_id, sk_state.clone()));
|
|
needs_update = true;
|
|
}
|
|
}
|
|
_ => {
|
|
deltas.push((*node_id, sk_state.clone()));
|
|
needs_update = true;
|
|
}
|
|
},
|
|
Vacant(_) => {
|
|
// This is a new node. Don't generate a delta for it.
|
|
deltas.push((*node_id, sk_state.clone()));
|
|
}
|
|
}
|
|
|
|
match entry {
|
|
Occupied(mut occ) if needs_update => {
|
|
(*occ.get_mut()) = sk_state.clone();
|
|
}
|
|
Vacant(vac) => {
|
|
vac.insert(sk_state.clone());
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
Ok(AvailablityDeltas(deltas))
|
|
}
|
|
}
|