Files
neon/storage_controller/src/safekeeper.rs
Arpad Müller 4d3c477689 storcon: timetime table, creation and deletion (#11058)
This PR extends the storcon with basic safekeeper management of
timelines, mainly timeline creation and deletion. We want to make the
storcon manage safekeepers in the future. Timeline creation is
controlled by the `--timelines-onto-safekeepers` flag.

1. it adds the `timelines` and `safekeeper_timeline_pending_ops` tables
to the storcon db
2. extend code for the timeline creation and deletion
4. it adds per-safekeeper reconciler tasks 

TODO:

* maybe not immediately schedule reconciliations for deletions but have
a prior manual step
* tenant deletions
* add exclude API definitions (probably separate PR)
* how to choose safekeeper to do exclude on vs deletion? this can be a
bit hairy because the safekeeper might go offline in the meantime.
* error/failure case handling
* tests (cc test_explicit_timeline_creation from #11002)
* single safekeeper mode: we often only have one SK (in tests for
example)
* `notify-safekeepers` hook:
https://github.com/neondatabase/neon/issues/11163

TODOs implemented:

* cancellations of enqueued reconciliations on a per-timeline basis,
helpful if there is an ongoing deletion
* implement pending ops overwrite behavior
* load pending operations from db

RFC section for important reading:
[link](https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md#storage_controller-implementation)

Implements the bulk of #9011
Successor of #10440.

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2025-03-11 02:31:22 +00:00

199 lines
6.5 KiB
Rust

use std::time::Duration;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::{Certificate, StatusCode};
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::backoff;
use utils::id::NodeId;
use utils::logging::SecretString;
use crate::heartbeater::SafekeeperState;
use crate::persistence::{DatabaseError, SafekeeperPersistence};
use crate::safekeeper_client::SafekeeperClient;
#[derive(Clone)]
pub struct Safekeeper {
pub(crate) skp: SafekeeperPersistence,
cancel: CancellationToken,
listen_http_addr: String,
listen_http_port: u16,
listen_https_port: Option<u16>,
scheduling_policy: SkSchedulingPolicy,
id: NodeId,
/// Heartbeating result.
availability: SafekeeperState,
// Flag from storcon's config to use https for safekeeper API.
// Invariant: if |true|, listen_https_port should contain a value.
use_https: bool,
}
impl Safekeeper {
pub(crate) fn from_persistence(
skp: SafekeeperPersistence,
cancel: CancellationToken,
use_https: bool,
) -> anyhow::Result<Self> {
if use_https && skp.https_port.is_none() {
anyhow::bail!(
"cannot load safekeeper {} from persistence: \
https is enabled, but https port is not specified",
skp.id,
);
}
let scheduling_policy = skp.scheduling_policy.0;
Ok(Self {
cancel,
listen_http_addr: skp.host.clone(),
listen_http_port: skp.http_port as u16,
listen_https_port: skp.https_port.map(|x| x as u16),
id: NodeId(skp.id as u64),
skp,
availability: SafekeeperState::Offline,
scheduling_policy,
use_https,
})
}
pub(crate) fn base_url(&self) -> String {
if self.use_https {
format!(
"https://{}:{}",
self.listen_http_addr,
self.listen_https_port
.expect("https port should be specified if use_https is on"),
)
} else {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
}
pub(crate) fn get_id(&self) -> NodeId {
self.id
}
pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.skp.as_describe_response()
}
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
self.availability = availability;
}
pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
self.scheduling_policy
}
pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
self.scheduling_policy = scheduling_policy;
self.skp.scheduling_policy = scheduling_policy.into();
}
pub(crate) fn availability(&self) -> SafekeeperState {
self.availability.clone()
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
#[allow(clippy::too_many_arguments)]
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<SecretString>,
ssl_ca_cert: &Option<Certificate>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> mgmt_api::Result<T>
where
O: FnMut(SafekeeperClient) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
CreateClient(_) => true,
}
}
// TODO: refactor SafekeeperClient and with_client_retires (#11113).
let mut http_client = reqwest::Client::builder().timeout(timeout);
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
backoff::retry(
|| {
let client = SafekeeperClient::new(
self.get_id(),
http_client.clone(),
self.base_url(),
jwt.clone(),
);
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
async {
tokio::select! {
r = op_fut=> {r},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}
}
},
is_fatal,
warn_threshold,
max_retries,
&format!(
"Call to safekeeper {} ({}) management API",
self.id,
self.base_url(),
),
cancel,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))
}
pub(crate) fn update_from_record(
&mut self,
record: crate::persistence::SafekeeperUpsert,
) -> anyhow::Result<()> {
let crate::persistence::SafekeeperUpsert {
active: _,
availability_zone_id: _,
host,
http_port,
https_port,
id,
port: _,
region_id: _,
version: _,
} = record.clone();
if id != self.id.0 as i64 {
// The way the function is called ensures this. If we regress on that, it's a bug.
panic!(
"id can't be changed via update_from_record function: {id} != {}",
self.id.0
);
}
if self.use_https && https_port.is_none() {
anyhow::bail!(
"cannot update safekeeper {id}: \
https is enabled, but https port is not specified"
);
}
self.skp =
crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
self.listen_http_port = http_port as u16;
self.listen_https_port = https_port.map(|x| x as u16);
self.listen_http_addr = host;
Ok(())
}
}