storcon: use https safekeeper api (#11065)

## Problem

Storage controller uses http for requests to safekeeper management API.

Closes: https://github.com/neondatabase/cloud/issues/24835

## Summary of changes
- Add `use_https_safekeeper_api` option to storcon to use https api
- Use https for requests to safekeeper management API if this option is
enabled
- Add `ssl_ca_file` option to storcon for ability to specify custom root
CA certificate
This commit is contained in:
Dmitrii Kovalkov
2025-03-07 21:22:47 +04:00
committed by GitHub
parent 87e6117dfd
commit e876794ce5
14 changed files with 193 additions and 63 deletions

View File

@@ -489,6 +489,7 @@ pub struct SafekeeperDescribeResponse {
pub host: String,
pub port: i32,
pub http_port: i32,
pub https_port: Option<i32>,
pub availability_zone_id: String,
pub scheduling_policy: SkSchedulingPolicy,
}

View File

@@ -37,6 +37,10 @@ pub enum Error {
#[error("Cancelled")]
Cancelled,
/// Failed to create client.
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
CreateClient(reqwest::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -64,11 +68,7 @@ impl ResponseErrorMessageExt for reqwest::Response {
}
impl Client {
pub fn new(mgmt_api_endpoint: String, jwt: Option<SecretString>) -> Self {
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
}
pub fn from_client(
pub fn new(
client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
@@ -172,12 +172,10 @@ impl Client {
uri: U,
body: B,
) -> Result<reqwest::Response> {
let req = self.client.request(method, uri);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value.get_contents())
} else {
req
};
let mut req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
}
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
}

View File

@@ -402,12 +402,16 @@ pub async fn handle_request(
bail!("Timeline {} already exists", request.timeline_id);
}
// TODO(DimasKovas): add ssl root CA certificate when implementing safekeeper's
// part of https support (#24836).
let http_client = reqwest::Client::new();
let http_hosts = request.http_hosts.clone();
// Figure out statuses of potential donors.
let responses: Vec<Result<TimelineStatus, mgmt_api::Error>> =
futures::future::join_all(http_hosts.iter().map(|url| async {
let cclient = Client::new(url.clone(), sk_auth_token.clone());
let cclient = Client::new(http_client.clone(), url.clone(), sk_auth_token.clone());
let info = cclient
.timeline_status(request.tenant_id, request.timeline_id)
.await?;
@@ -460,8 +464,10 @@ async fn pull_timeline(
let conf = &global_timelines.get_global_config();
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
let client = Client::new(host.clone(), sk_auth_token.clone());
// TODO(DimasKovas): add ssl root CA certificate when implementing safekeeper's
// part of https support (#24836).
let http_client = reqwest::Client::new();
let client = Client::new(http_client, host.clone(), sk_auth_token.clone());
// Request stream with basebackup archive.
let bb_resp = client
.snapshot(status.tenant_id, status.timeline_id, conf.my_id)

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers DROP https_port;

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers ADD https_port INTEGER;

View File

@@ -8,6 +8,7 @@ use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::controller_api::{NodeAvailability, SkSchedulingPolicy};
use pageserver_api::models::PageserverUtilization;
use reqwest::Certificate;
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api;
use thiserror::Error;
@@ -27,6 +28,7 @@ struct HeartbeaterTask<Server, State> {
max_offline_interval: Duration,
max_warming_up_interval: Duration,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
}
#[derive(Debug, Clone)]
@@ -75,6 +77,7 @@ where
{
pub(crate) fn new(
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -84,6 +87,7 @@ where
let mut heartbeater = HeartbeaterTask::new(
receiver,
jwt_token,
ssl_ca_cert,
max_offline_interval,
max_warming_up_interval,
cancel,
@@ -119,6 +123,7 @@ where
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest<Server, State>>,
jwt_token: Option<String>,
ssl_ca_cert: Option<Certificate>,
max_offline_interval: Duration,
max_warming_up_interval: Duration,
cancel: CancellationToken,
@@ -130,6 +135,7 @@ where
max_offline_interval,
max_warming_up_interval,
jwt_token,
ssl_ca_cert,
}
}
async fn run(&mut self) {
@@ -325,6 +331,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
.jwt_token
.as_ref()
.map(|t| SecretString::from(t.to_owned()));
let ssl_ca_cert = self.ssl_ca_cert.clone();
let cancel = self.cancel.clone();
async move {
@@ -332,6 +339,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
&ssl_ca_cert,
3,
3,
Duration::from_secs(1),

View File

@@ -8,6 +8,7 @@ use clap::Parser;
use hyper0::Uri;
use metrics::BuildInfo;
use metrics::launch_timestamp::LaunchTimestamp;
use reqwest::Certificate;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
@@ -128,21 +129,28 @@ struct Cli {
#[arg(long)]
chaos_exit_crontab: Option<cron::Schedule>,
// Maximum acceptable lag for the secondary location while draining
// a pageserver
/// Maximum acceptable lag for the secondary location while draining
/// a pageserver
#[arg(long)]
max_secondary_lag_bytes: Option<u64>,
// Period with which to send heartbeats to registered nodes
/// Period with which to send heartbeats to registered nodes
#[arg(long)]
heartbeat_interval: Option<humantime::Duration>,
#[arg(long)]
long_reconcile_threshold: Option<humantime::Duration>,
// Flag to use https for requests to pageserver API.
/// Flag to use https for requests to pageserver API.
#[arg(long, default_value = "false")]
use_https_pageserver_api: bool,
/// Flag to use https for requests to safekeeper API.
#[arg(long, default_value = "false")]
use_https_safekeeper_api: bool,
/// Trusted root CA certificate to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
}
enum StrictMode {
@@ -315,6 +323,15 @@ async fn async_main() -> anyhow::Result<()> {
}
}
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
Some(ssl_ca_file) => {
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
let buf = tokio::fs::read(ssl_ca_file).await?;
Some(Certificate::from_pem(&buf)?)
}
None => None,
};
let config = Config {
pageserver_jwt_token: secrets.pageserver_jwt_token,
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
@@ -351,6 +368,8 @@ async fn async_main() -> anyhow::Result<()> {
start_as_candidate: args.start_as_candidate,
http_service_port: args.listen.port() as i32,
use_https_pageserver_api: args.use_https_pageserver_api,
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_cert,
};
// Validate that we can connect to the database

View File

@@ -1,7 +1,6 @@
use std::str::FromStr;
use std::time::Duration;
use anyhow::anyhow;
use pageserver_api::controller_api::{
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
NodeSchedulingPolicy, TenantLocateResponseShard,
@@ -211,7 +210,10 @@ impl Node {
use_https: bool,
) -> anyhow::Result<Self> {
if use_https && listen_https_port.is_none() {
return Err(anyhow!("https is enabled, but node has no https port"));
anyhow::bail!(
"cannot create node {id}: \
https is enabled, but https port is not specified"
);
}
Ok(Self {
@@ -244,7 +246,11 @@ impl Node {
pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result<Self> {
if use_https && np.listen_https_port.is_none() {
return Err(anyhow!("https is enabled, but node has no https port"));
anyhow::bail!(
"cannot load node {} from persistent: \
https is enabled, but https port is not specified",
np.node_id,
);
}
Ok(Self {

View File

@@ -1559,6 +1559,7 @@ pub(crate) struct SafekeeperPersistence {
pub(crate) http_port: i32,
pub(crate) availability_zone_id: String,
pub(crate) scheduling_policy: SkSchedulingPolicyFromSql,
pub(crate) https_port: Option<i32>,
}
/// Wrapper struct around [`SkSchedulingPolicy`] because both it and [`FromSql`] are from foreign crates,
@@ -1599,6 +1600,7 @@ impl SafekeeperPersistence {
host: upsert.host,
port: upsert.port,
http_port: upsert.http_port,
https_port: upsert.https_port,
availability_zone_id: upsert.availability_zone_id,
scheduling_policy: SkSchedulingPolicyFromSql(scheduling_policy),
}
@@ -1611,6 +1613,7 @@ impl SafekeeperPersistence {
host: self.host.clone(),
port: self.port,
http_port: self.http_port,
https_port: self.https_port,
availability_zone_id: self.availability_zone_id.clone(),
scheduling_policy: self.scheduling_policy.0,
})
@@ -1631,6 +1634,7 @@ pub(crate) struct SafekeeperUpsert {
/// The active flag will not be stored in the database and will be ignored.
pub(crate) active: Option<bool>,
pub(crate) http_port: i32,
pub(crate) https_port: Option<i32>,
pub(crate) availability_zone_id: String,
}
@@ -1646,6 +1650,7 @@ impl SafekeeperUpsert {
host: &self.host,
port: self.port,
http_port: self.http_port,
https_port: self.https_port,
availability_zone_id: &self.availability_zone_id,
// None means a wish to not update this column. We expose abilities to update it via other means.
scheduling_policy: None,
@@ -1662,6 +1667,7 @@ struct InsertUpdateSafekeeper<'a> {
host: &'a str,
port: i32,
http_port: i32,
https_port: Option<i32>,
availability_zone_id: &'a str,
scheduling_policy: Option<&'a str>,
}

View File

@@ -1,7 +1,7 @@
use std::time::Duration;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use reqwest::StatusCode;
use reqwest::{Certificate, StatusCode};
use safekeeper_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -18,26 +18,55 @@ pub struct Safekeeper {
cancel: CancellationToken,
listen_http_addr: String,
listen_http_port: u16,
listen_https_port: Option<u16>,
scheduling_policy: SkSchedulingPolicy,
id: NodeId,
availability: SafekeeperState,
// Flag from storcon's config to use https for safekeeper API.
// Invariant: if |true|, listen_https_port should contain a value.
use_https: bool,
}
impl Safekeeper {
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
pub(crate) fn from_persistence(
skp: SafekeeperPersistence,
cancel: CancellationToken,
use_https: bool,
) -> anyhow::Result<Self> {
if use_https && skp.https_port.is_none() {
anyhow::bail!(
"cannot load safekeeper {} from persistence: \
https is enabled, but https port is not specified",
skp.id,
);
}
let scheduling_policy = skp.scheduling_policy.0;
Self {
Ok(Self {
cancel,
listen_http_addr: skp.host.clone(),
listen_http_port: skp.http_port as u16,
listen_https_port: skp.https_port.map(|x| x as u16),
id: NodeId(skp.id as u64),
skp,
availability: SafekeeperState::Offline,
scheduling_policy,
}
use_https,
})
}
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
if self.use_https {
format!(
"https://{}:{}",
self.listen_http_addr,
self.listen_https_port
.expect("https port should be specified if use_https is on"),
)
} else {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
}
pub(crate) fn get_id(&self) -> NodeId {
@@ -57,10 +86,12 @@ impl Safekeeper {
self.skp.scheduling_policy = scheduling_policy.into();
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
#[allow(clippy::too_many_arguments)]
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<SecretString>,
ssl_ca_cert: &Option<Certificate>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
@@ -79,19 +110,22 @@ impl Safekeeper {
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
CreateClient(_) => true,
}
}
// TODO: refactor SafekeeperClient and with_client_retires (#11113).
let mut http_client = reqwest::Client::builder().timeout(timeout);
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
http_client = http_client.add_root_certificate(ssl_ca_cert.clone());
}
let http_client = http_client.build().map_err(mgmt_api::Error::CreateClient)?;
backoff::retry(
|| {
let http_client = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to construct HTTP client");
let client = SafekeeperClient::from_client(
let client = SafekeeperClient::new(
self.get_id(),
http_client,
http_client.clone(),
self.base_url(),
jwt.clone(),
);
@@ -112,8 +146,9 @@ impl Safekeeper {
warn_threshold,
max_retries,
&format!(
"Call to safekeeper {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
"Call to safekeeper {} ({}) management API",
self.id,
self.base_url(),
),
cancel,
)
@@ -121,12 +156,16 @@ impl Safekeeper {
.unwrap_or(Err(mgmt_api::Error::Cancelled))
}
pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
pub(crate) fn update_from_record(
&mut self,
record: crate::persistence::SafekeeperUpsert,
) -> anyhow::Result<()> {
let crate::persistence::SafekeeperUpsert {
active: _,
availability_zone_id: _,
host,
http_port,
https_port,
id,
port: _,
region_id: _,
@@ -139,9 +178,17 @@ impl Safekeeper {
self.id.0
);
}
if self.use_https && https_port.is_none() {
anyhow::bail!(
"cannot update safekeeper {id}: \
https is enabled, but https port is not specified"
);
}
self.skp =
crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
self.listen_http_port = http_port as u16;
self.listen_https_port = https_port.map(|x| x as u16);
self.listen_http_addr = host;
Ok(())
}
}

View File

@@ -45,26 +45,14 @@ macro_rules! measured_request {
}
impl SafekeeperClient {
#[allow(dead_code)]
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
inner: Client::new(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}

View File

@@ -40,6 +40,7 @@ diesel::table! {
http_port -> Int4,
availability_zone_id -> Text,
scheduling_policy -> Varchar,
https_port -> Nullable<Int4>,
}
}

View File

@@ -48,7 +48,7 @@ use pageserver_api::upcall_api::{
ValidateResponseTenant,
};
use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::StatusCode;
use reqwest::{Certificate, StatusCode};
use safekeeper_api::models::SafekeeperUtilization;
use tokio::sync::TryAcquireError;
use tokio::sync::mpsc::error::TrySendError;
@@ -400,6 +400,9 @@ pub struct Config {
pub long_reconcile_threshold: Duration,
pub use_https_pageserver_api: bool,
pub use_https_safekeeper_api: bool,
pub ssl_ca_cert: Option<Certificate>,
}
impl From<DatabaseError> for ApiError {
@@ -1420,8 +1423,14 @@ impl Service {
.list_safekeepers()
.await?
.into_iter()
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
.collect::<Vec<_>>();
.map(|skp| {
Safekeeper::from_persistence(
skp,
CancellationToken::new(),
config.use_https_safekeeper_api,
)
})
.collect::<anyhow::Result<Vec<_>>>()?;
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
@@ -1559,6 +1568,7 @@ impl Service {
let heartbeater_ps = Heartbeater::new(
config.pageserver_jwt_token.clone(),
config.ssl_ca_cert.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
@@ -1566,6 +1576,7 @@ impl Service {
let heartbeater_sk = Heartbeater::new(
config.safekeeper_jwt_token.clone(),
config.ssl_ca_cert.clone(),
config.max_offline_interval,
config.max_warming_up_interval,
cancel.clone(),
@@ -8227,24 +8238,41 @@ impl Service {
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), DatabaseError> {
) -> Result<(), ApiError> {
let node_id = NodeId(record.id as u64);
let use_https = self.config.use_https_safekeeper_api;
if use_https && record.https_port.is_none() {
return Err(ApiError::PreconditionFailed(
format!(
"cannot upsert safekeeper {node_id}: \
https is enabled, but https port is not specified"
)
.into(),
));
}
self.persistence.safekeeper_upsert(record.clone()).await?;
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().update_from_record(record);
}
std::collections::hash_map::Entry::Occupied(mut entry) => entry
.get_mut()
.update_from_record(record)
.expect("all preconditions should be checked before upsert to database"),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
));
entry.insert(
Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
use_https,
)
.expect("all preconditions should be checked before upsert to database"),
);
}
}
locked.safekeepers = Arc::new(safekeepers);

View File

@@ -3226,6 +3226,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
"host": "localhost",
"port": sk_0.port.pg,
"http_port": sk_0.port.http,
"https_port": None,
"version": 5957,
"availability_zone_id": "us-east-2b",
}
@@ -3260,6 +3261,24 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
assert eq_safekeeper_records(body, inserted_now)
# https_port appears during migration
body["https_port"] = 123
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
env.storage_controller.consistency_check()
# https_port rollback
body["https_port"] = None
target.on_safekeeper_deploy(fake_id, body)
inserted_now = target.get_safekeeper(fake_id)
assert target.get_safekeepers() == [inserted_now]
assert inserted_now is not None
assert eq_safekeeper_records(body, inserted_now)
env.storage_controller.consistency_check()
# some small tests for the scheduling policy querying and returning APIs
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
@@ -3792,6 +3811,7 @@ def test_storage_controller_node_flap_detach_race(
wait_until(validate_locations, timeout=10)
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
"""
Check that storage controller handles node_register requests with updated fields correctly.