mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
safekeeper: https for management API (#11171)
## Problem Storage controller uses unencrypted HTTP requests for safekeeper management API. - Closes: https://github.com/neondatabase/cloud/issues/24836 ## Summary of changes - Replace `hyper0::server::Server` with `http_utils::server::Server` in safekeeper. - Add HTTPS handler for safekeeper management API.
This commit is contained in:
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -2863,6 +2863,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"camino",
|
||||
"fail",
|
||||
"futures",
|
||||
"hyper 0.14.30",
|
||||
@@ -2873,6 +2874,7 @@ dependencies = [
|
||||
"pprof",
|
||||
"regex",
|
||||
"routerify",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
@@ -4327,8 +4329,6 @@ dependencies = [
|
||||
"reqwest",
|
||||
"rpds",
|
||||
"rustls 0.23.18",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"rustls-pki-types",
|
||||
"scopeguard",
|
||||
"send-future",
|
||||
"serde",
|
||||
@@ -6044,6 +6044,7 @@ dependencies = [
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rustls 0.23.18",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scopeguard",
|
||||
@@ -6060,6 +6061,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
|
||||
@@ -149,7 +149,7 @@ pub struct NeonBroker {
|
||||
pub listen_addr: SocketAddr,
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
/// A part of storage controller's config the neon_local knows about.
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct NeonStorageControllerConf {
|
||||
@@ -176,10 +176,11 @@ pub struct NeonStorageControllerConf {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub long_reconcile_threshold: Option<Duration>,
|
||||
|
||||
#[serde(default)]
|
||||
pub use_https_pageserver_api: bool,
|
||||
|
||||
pub timelines_onto_safekeepers: bool,
|
||||
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -205,6 +206,7 @@ impl Default for NeonStorageControllerConf {
|
||||
long_reconcile_threshold: None,
|
||||
use_https_pageserver_api: false,
|
||||
timelines_onto_safekeepers: false,
|
||||
use_https_safekeeper_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -302,6 +304,7 @@ pub struct SafekeeperConf {
|
||||
pub pg_port: u16,
|
||||
pub pg_tenant_only_port: Option<u16>,
|
||||
pub http_port: u16,
|
||||
pub https_port: Option<u16>,
|
||||
pub sync: bool,
|
||||
pub remote_storage: Option<String>,
|
||||
pub backup_threads: Option<u32>,
|
||||
@@ -316,6 +319,7 @@ impl Default for SafekeeperConf {
|
||||
pg_port: 0,
|
||||
pg_tenant_only_port: None,
|
||||
http_port: 0,
|
||||
https_port: None,
|
||||
sync: true,
|
||||
remote_storage: None,
|
||||
backup_threads: None,
|
||||
@@ -845,6 +849,9 @@ impl LocalEnv {
|
||||
// create safekeeper dirs
|
||||
for safekeeper in &env.safekeepers {
|
||||
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;
|
||||
SafekeeperNode::from_env(&env, safekeeper)
|
||||
.initialize()
|
||||
.context("safekeeper init failed")?;
|
||||
}
|
||||
|
||||
// initialize pageserver state
|
||||
|
||||
@@ -111,6 +111,18 @@ impl SafekeeperNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
/// Initializes a safekeeper node by creating all necessary files,
|
||||
/// e.g. SSL certificates.
|
||||
pub fn initialize(&self) -> anyhow::Result<()> {
|
||||
if self.env.generate_local_ssl_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
&self.datadir_path().join("server.crt"),
|
||||
&self.datadir_path().join("server.key"),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&self,
|
||||
extra_opts: &[String],
|
||||
@@ -196,6 +208,16 @@ impl SafekeeperNode {
|
||||
]);
|
||||
}
|
||||
|
||||
if let Some(https_port) = self.conf.https_port {
|
||||
args.extend([
|
||||
"--listen-https".to_owned(),
|
||||
format!("{}:{}", self.listen_addr, https_port),
|
||||
]);
|
||||
}
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
args.extend_from_slice(extra_opts);
|
||||
|
||||
background_process::start_process(
|
||||
|
||||
@@ -538,6 +538,10 @@ impl StorageController {
|
||||
args.push("--use-https-pageserver-api".to_string());
|
||||
}
|
||||
|
||||
if self.config.use_https_safekeeper_api {
|
||||
args.push("--use-https-safekeeper-api".to_string());
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hyper0.workspace = true
|
||||
@@ -16,6 +17,7 @@ once_cell.workspace = true
|
||||
pprof.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_path_to_error.workspace = true
|
||||
|
||||
@@ -4,6 +4,7 @@ pub mod failpoints;
|
||||
pub mod json;
|
||||
pub mod request;
|
||||
pub mod server;
|
||||
pub mod tls_certs;
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
|
||||
|
||||
21
libs/http-utils/src/tls_certs.rs
Normal file
21
libs/http-utils/src/tls_certs.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
use camino::Utf8Path;
|
||||
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
|
||||
pub fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
Ok(rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?)
|
||||
}
|
||||
|
||||
pub fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
let key = rustls_pemfile::private_key(&mut reader)?;
|
||||
|
||||
key.ok_or(anyhow::anyhow!(
|
||||
"no private key found in {}",
|
||||
filename.as_str(),
|
||||
))
|
||||
}
|
||||
@@ -48,8 +48,6 @@ pprof.workspace = true
|
||||
rand.workspace = true
|
||||
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
|
||||
regex.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
rustls-pki-types.workspace = true
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
send-future.workspace = true
|
||||
|
||||
@@ -30,7 +30,6 @@ use pageserver::{
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -622,8 +621,8 @@ fn start_pageserver(
|
||||
|
||||
let https_task = match https_listener {
|
||||
Some(https_listener) => {
|
||||
let certs = load_certs(&conf.ssl_cert_file)?;
|
||||
let key = load_private_key(&conf.ssl_key_file)?;
|
||||
let certs = http_utils::tls_certs::load_cert_chain(&conf.ssl_cert_file)?;
|
||||
let key = http_utils::tls_certs::load_private_key(&conf.ssl_key_file)?;
|
||||
|
||||
let server_config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
@@ -735,25 +734,6 @@ fn start_pageserver(
|
||||
})
|
||||
}
|
||||
|
||||
fn load_certs(filename: &Utf8Path) -> std::io::Result<Vec<CertificateDer<'static>>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
rustls_pemfile::certs(&mut reader).collect()
|
||||
}
|
||||
|
||||
fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
let key = rustls_pemfile::private_key(&mut reader)?;
|
||||
|
||||
key.ok_or(anyhow::anyhow!(
|
||||
"no private key found in {}",
|
||||
filename.as_str(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn create_remote_storage_client(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
|
||||
@@ -35,8 +35,9 @@ postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
smallvec.workspace = true
|
||||
@@ -45,10 +46,11 @@ strum_macros.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tokio = { workspace = true, features = ["fs"] }
|
||||
tokio-util = { workspace = true }
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util = { workspace = true }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
|
||||
@@ -16,10 +16,12 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use metrics::set_build_info_metric;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
|
||||
DEFAULT_SSL_KEY_FILE,
|
||||
};
|
||||
use safekeeper::{
|
||||
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
|
||||
@@ -94,6 +96,9 @@ struct Args {
|
||||
/// Listen http endpoint for management and metrics in the form host:port.
|
||||
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
|
||||
listen_http: String,
|
||||
/// Listen https endpoint for management and metrics in the form host:port.
|
||||
#[arg(long, default_value = None)]
|
||||
listen_https: Option<String>,
|
||||
/// Advertised endpoint for receiving/sending WAL in the form host:port. If not
|
||||
/// specified, listen_pg is used to advertise instead.
|
||||
#[arg(long, default_value = None)]
|
||||
@@ -203,6 +208,15 @@ struct Args {
|
||||
/// and the current position of the reader is smaller than this value.
|
||||
#[arg(long)]
|
||||
max_delta_for_fanout: Option<u64>,
|
||||
/// Path to a file with certificate's private key for https API.
|
||||
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
|
||||
ssl_key_file: Utf8PathBuf,
|
||||
/// Path to a file with a X509 certificate for https API.
|
||||
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
|
||||
ssl_cert_file: Utf8PathBuf,
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<Utf8PathBuf>,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -336,12 +350,22 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let ssl_ca_cert = match args.ssl_ca_file.as_ref() {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Some(Certificate::from_pem(&buf)?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let conf = Arc::new(SafeKeeperConf {
|
||||
workdir,
|
||||
my_id: id,
|
||||
listen_pg_addr: args.listen_pg,
|
||||
listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
|
||||
listen_http_addr: args.listen_http,
|
||||
listen_https_addr: args.listen_https,
|
||||
advertise_pg_addr: args.advertise_pg,
|
||||
availability_zone: args.availability_zone,
|
||||
no_sync: args.no_sync,
|
||||
@@ -368,6 +392,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
eviction_min_resident: args.eviction_min_resident,
|
||||
wal_reader_fanout: args.wal_reader_fanout,
|
||||
max_delta_for_fanout: args.max_delta_for_fanout,
|
||||
ssl_key_file: args.ssl_key_file,
|
||||
ssl_cert_file: args.ssl_cert_file,
|
||||
ssl_ca_cert,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
@@ -428,6 +455,17 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
e
|
||||
})?;
|
||||
|
||||
let https_listener = match conf.listen_https_addr.as_ref() {
|
||||
Some(listen_https_addr) => {
|
||||
info!("starting safekeeper HTTPS service on {}", listen_https_addr);
|
||||
Some(tcp_listener::bind(listen_https_addr).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", listen_https_addr, e);
|
||||
e
|
||||
})?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
|
||||
|
||||
// Register metrics collector for active timelines. It's important to do this
|
||||
@@ -501,7 +539,7 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
let http_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
||||
.spawn(http::task_main(
|
||||
.spawn(http::task_main_http(
|
||||
conf.clone(),
|
||||
http_listener,
|
||||
global_timelines.clone(),
|
||||
@@ -509,6 +547,19 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
.map(|res| ("HTTP service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(http_handle));
|
||||
|
||||
if let Some(https_listener) = https_listener {
|
||||
let https_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
||||
.spawn(http::task_main_https(
|
||||
conf.clone(),
|
||||
https_listener,
|
||||
global_timelines.clone(),
|
||||
))
|
||||
.map(|res| ("HTTPS service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(https_handle));
|
||||
}
|
||||
|
||||
let broker_task_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| BROKER_RUNTIME.handle())
|
||||
|
||||
@@ -3,10 +3,11 @@ use std::sync::Arc;
|
||||
|
||||
pub use routes::make_router;
|
||||
pub use safekeeper_api::models;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
pub async fn task_main(
|
||||
pub async fn task_main_http(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
http_listener: std::net::TcpListener,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
@@ -14,8 +15,37 @@ pub async fn task_main(
|
||||
let router = make_router(conf, global_timelines)
|
||||
.build()
|
||||
.map_err(|err| anyhow::anyhow!(err))?;
|
||||
let service = http_utils::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?;
|
||||
server.serve(service).await?;
|
||||
|
||||
let service = Arc::new(
|
||||
http_utils::RequestServiceBuilder::new(router).map_err(|err| anyhow::anyhow!(err))?,
|
||||
);
|
||||
let server = http_utils::server::Server::new(service, http_listener, None)?;
|
||||
server.serve(CancellationToken::new()).await?;
|
||||
Ok(()) // unreachable
|
||||
}
|
||||
|
||||
pub async fn task_main_https(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
https_listener: std::net::TcpListener,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> anyhow::Result<()> {
|
||||
let certs = http_utils::tls_certs::load_cert_chain(&conf.ssl_cert_file)?;
|
||||
let key = http_utils::tls_certs::load_private_key(&conf.ssl_key_file)?;
|
||||
|
||||
let server_config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)?;
|
||||
|
||||
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
|
||||
|
||||
let router = make_router(conf, global_timelines)
|
||||
.build()
|
||||
.map_err(|err| anyhow::anyhow!(err))?;
|
||||
|
||||
let service = Arc::new(
|
||||
http_utils::RequestServiceBuilder::new(router).map_err(|err| anyhow::anyhow!(err))?,
|
||||
);
|
||||
let server = http_utils::server::Server::new(service, https_listener, Some(tls_acceptor))?;
|
||||
server.serve(CancellationToken::new()).await?;
|
||||
Ok(()) // unreachable
|
||||
}
|
||||
|
||||
@@ -232,9 +232,14 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let conf = get_conf(&request);
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone(), global_timelines)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let resp = pull_timeline::handle_request(
|
||||
data,
|
||||
conf.sk_auth_token.clone(),
|
||||
conf.ssl_ca_cert.clone(),
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::time::Duration;
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::Lazy;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_broker::Uri;
|
||||
use tokio::runtime::Runtime;
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
@@ -69,6 +70,9 @@ pub mod defaults {
|
||||
// before uploading a partial segment, so that in normal operation the eviction can happen
|
||||
// as soon as we have done the partial segment upload.
|
||||
pub const DEFAULT_EVICTION_MIN_RESIDENT: &str = DEFAULT_PARTIAL_BACKUP_TIMEOUT;
|
||||
|
||||
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
|
||||
pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -84,6 +88,7 @@ pub struct SafeKeeperConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_addr_tenant_only: Option<String>,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub advertise_pg_addr: Option<String>,
|
||||
pub availability_zone: Option<String>,
|
||||
pub no_sync: bool,
|
||||
@@ -111,6 +116,9 @@ pub struct SafeKeeperConf {
|
||||
pub eviction_min_resident: Duration,
|
||||
pub wal_reader_fanout: bool,
|
||||
pub max_delta_for_fanout: Option<u64>,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_ca_cert: Option<Certificate>,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -127,6 +135,7 @@ impl SafeKeeperConf {
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_pg_addr_tenant_only: None,
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
listen_https_addr: None,
|
||||
advertise_pg_addr: None,
|
||||
availability_zone: None,
|
||||
remote_storage: None,
|
||||
@@ -155,6 +164,9 @@ impl SafeKeeperConf {
|
||||
eviction_min_resident: Duration::ZERO,
|
||||
wal_reader_fanout: false,
|
||||
max_delta_for_fanout: None,
|
||||
ssl_key_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(defaults::DEFAULT_SSL_CERT_FILE),
|
||||
ssl_ca_cert: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use camino::Utf8PathBuf;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
|
||||
use safekeeper_client::mgmt_api;
|
||||
@@ -392,6 +393,7 @@ pub struct DebugDumpResponse {
|
||||
pub async fn handle_request(
|
||||
request: PullTimelineRequest,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
@@ -402,9 +404,11 @@ pub async fn handle_request(
|
||||
bail!("Timeline {} already exists", request.timeline_id);
|
||||
}
|
||||
|
||||
// TODO(DimasKovas): add ssl root CA certificate when implementing safekeeper's
|
||||
// part of https support (#24836).
|
||||
let http_client = reqwest::Client::new();
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
let http_hosts = request.http_hosts.clone();
|
||||
|
||||
@@ -441,13 +445,21 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await
|
||||
pull_timeline(
|
||||
status,
|
||||
safekeeper_host,
|
||||
sk_auth_token,
|
||||
http_client,
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn pull_timeline(
|
||||
status: TimelineStatus,
|
||||
host: String,
|
||||
sk_auth_token: Option<SecretString>,
|
||||
http_client: reqwest::Client,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
@@ -464,9 +476,6 @@ async fn pull_timeline(
|
||||
let conf = &global_timelines.get_global_config();
|
||||
|
||||
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
|
||||
// TODO(DimasKovas): add ssl root CA certificate when implementing safekeeper's
|
||||
// part of https support (#24836).
|
||||
let http_client = reqwest::Client::new();
|
||||
let client = Client::new(http_client, host.clone(), sk_auth_token.clone());
|
||||
// Request stream with basebackup archive.
|
||||
let bb_resp = client
|
||||
|
||||
@@ -152,6 +152,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
my_id: NodeId(os.id() as u64),
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
listen_https_addr: None,
|
||||
no_sync: false,
|
||||
broker_endpoint: "/".parse::<Uri>().unwrap(),
|
||||
broker_keepalive_interval: Duration::from_secs(0),
|
||||
@@ -179,6 +180,9 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
eviction_min_resident: Duration::ZERO,
|
||||
wal_reader_fanout: false,
|
||||
max_delta_for_fanout: None,
|
||||
ssl_key_file: Utf8PathBuf::from(""),
|
||||
ssl_cert_file: Utf8PathBuf::from(""),
|
||||
ssl_ca_cert: None,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -466,6 +466,9 @@ class NeonEnvBuilder:
|
||||
# Flag to enable https listener in pageserver, generate local ssl certs,
|
||||
# and force storage controller to use https for pageserver api.
|
||||
self.use_https_pageserver_api: bool = False
|
||||
# Flag to enable https listener in safekeeper, generate local ssl certs,
|
||||
# and force storage controller to use https for safekeeper api.
|
||||
self.use_https_safekeeper_api: bool = False
|
||||
|
||||
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
|
||||
self.pageserver_get_vectored_concurrent_io: str | None = (
|
||||
@@ -1063,7 +1066,9 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.generate_local_ssl_certs = config.use_https_pageserver_api
|
||||
self.generate_local_ssl_certs = (
|
||||
config.use_https_pageserver_api or config.use_https_safekeeper_api
|
||||
)
|
||||
self.ssl_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
|
||||
)
|
||||
@@ -1146,6 +1151,10 @@ class NeonEnv:
|
||||
storage_controller_config = storage_controller_config or {}
|
||||
storage_controller_config["use_https_pageserver_api"] = True
|
||||
|
||||
if config.use_https_safekeeper_api:
|
||||
storage_controller_config = storage_controller_config or {}
|
||||
storage_controller_config["use_https_safekeeper_api"] = True
|
||||
|
||||
if storage_controller_config is not None:
|
||||
cfg["storage_controller"] = storage_controller_config
|
||||
|
||||
@@ -1248,6 +1257,7 @@ class NeonEnv:
|
||||
pg=self.port_distributor.get_port(),
|
||||
pg_tenant_only=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
https=self.port_distributor.get_port() if config.use_https_safekeeper_api else None,
|
||||
)
|
||||
id = config.safekeepers_id_start + i # assign ids sequentially
|
||||
sk_cfg: dict[str, Any] = {
|
||||
@@ -1255,6 +1265,7 @@ class NeonEnv:
|
||||
"pg_port": port.pg,
|
||||
"pg_tenant_only_port": port.pg_tenant_only,
|
||||
"http_port": port.http,
|
||||
"https_port": port.https,
|
||||
"sync": config.safekeepers_enable_fsync,
|
||||
}
|
||||
if config.auth_enabled:
|
||||
@@ -4475,6 +4486,7 @@ class SafekeeperPort:
|
||||
pg: int
|
||||
pg_tenant_only: int
|
||||
http: int
|
||||
https: int | None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -13,3 +15,54 @@ def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
|
||||
|
||||
def test_safekeeper_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test HTTPS safekeeper management API.
|
||||
1. Make /v1/status request to HTTPS API to ensure it's appropriately configured.
|
||||
2. Try to register safekeeper in storcon with https port missing.
|
||||
3. Register safekeeper with https port.
|
||||
4. Wait for a heartbeat round to complete.
|
||||
"""
|
||||
neon_env_builder.use_https_safekeeper_api = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# 1. Make simple https request.
|
||||
addr = f"https://localhost:{sk.port.https}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
|
||||
# Note: http_port is intentionally wrong.
|
||||
# Storcon should not use it if use_https is on.
|
||||
http_port = 0
|
||||
|
||||
body = {
|
||||
"active": True,
|
||||
"id": sk.id,
|
||||
"created_at": "2023-10-25T09:11:25Z",
|
||||
"updated_at": "2024-08-28T11:32:43Z",
|
||||
"region_id": "aws-us-east-2",
|
||||
"host": "localhost",
|
||||
"port": sk.port.pg,
|
||||
"http_port": http_port,
|
||||
"https_port": None,
|
||||
"version": 5957,
|
||||
"availability_zone_id": "us-east-2b",
|
||||
}
|
||||
# 2. Try register with https port missing.
|
||||
with pytest.raises(StorageControllerApiException, match="https port is not specified"):
|
||||
env.storage_controller.on_safekeeper_deploy(sk.id, body)
|
||||
|
||||
# 3. Register with https port.
|
||||
body["https_port"] = sk.port.https
|
||||
env.storage_controller.on_safekeeper_deploy(sk.id, body)
|
||||
|
||||
# 4. Wait for hearbeat round complete.
|
||||
def storcon_heartbeat():
|
||||
assert env.storage_controller.log_contains(
|
||||
"Heartbeat round complete for 1 safekeepers, 0 offline"
|
||||
)
|
||||
|
||||
wait_until(storcon_heartbeat)
|
||||
|
||||
@@ -1427,6 +1427,7 @@ class SafekeeperEnv:
|
||||
pg=self.port_distributor.get_port(),
|
||||
pg_tenant_only=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
https=None,
|
||||
)
|
||||
|
||||
safekeeper_dir = self.repo_dir / f"sk{i}"
|
||||
|
||||
Reference in New Issue
Block a user