mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
storcon: add https API (#11239)
## Problem Pageservers use unencrypted HTTP requests for storage controller API. - Closes: https://github.com/neondatabase/cloud/issues/25524 ## Summary of changes - Replace hyper0::server::Server with http_utils::server::Server in storage controller. - Add HTTPS handler for storage controller API. - Support `ssl_ca_file` in pageserver.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -6605,6 +6605,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"clap",
|
||||
"clashmap",
|
||||
@@ -6648,6 +6649,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"utils",
|
||||
|
||||
@@ -142,6 +142,10 @@ impl PageServerNode {
|
||||
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
overrides.push(format!("ssl_ca_file='{}'", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
// Apply the user-provided overrides
|
||||
overrides.push({
|
||||
let mut doc =
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::process::ExitStatus;
|
||||
use std::str::FromStr;
|
||||
@@ -38,9 +37,9 @@ pub struct StorageController {
|
||||
client: reqwest::Client,
|
||||
config: NeonStorageControllerConf,
|
||||
|
||||
// The listen addresses is learned when starting the storage controller,
|
||||
// The listen port is learned when starting the storage controller,
|
||||
// hence the use of OnceLock to init it at the right time.
|
||||
listen: OnceLock<SocketAddr>,
|
||||
listen_port: OnceLock<u16>,
|
||||
}
|
||||
|
||||
const COMMAND: &str = "storage_controller";
|
||||
@@ -144,15 +143,23 @@ impl StorageController {
|
||||
}
|
||||
};
|
||||
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
if let Some(ssl_ca_file) = env.ssl_ca_cert_path() {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
|
||||
let cert = reqwest::Certificate::from_pem(&buf).expect("SSL CA file should be valid");
|
||||
http_client = http_client.add_root_certificate(cert);
|
||||
}
|
||||
let http_client = http_client
|
||||
.build()
|
||||
.expect("HTTP client should construct with no error");
|
||||
|
||||
Self {
|
||||
env: env.clone(),
|
||||
private_key,
|
||||
public_key,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
client: http_client,
|
||||
config: env.storage_controller.clone(),
|
||||
listen: OnceLock::default(),
|
||||
listen_port: OnceLock::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,34 +344,34 @@ impl StorageController {
|
||||
}
|
||||
}
|
||||
|
||||
let (listen, postgres_port) = {
|
||||
if let Some(base_port) = start_args.base_port {
|
||||
(
|
||||
format!("127.0.0.1:{base_port}"),
|
||||
self.config
|
||||
.database_url
|
||||
.expect("--base-port requires NeonStorageControllerConf::database_url")
|
||||
.port(),
|
||||
)
|
||||
} else {
|
||||
let listen_url = self.env.control_plane_api.clone();
|
||||
if self.env.generate_local_ssl_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
&instance_dir.join("server.crt"),
|
||||
&instance_dir.join("server.key"),
|
||||
)?;
|
||||
}
|
||||
|
||||
let listen = format!(
|
||||
"{}:{}",
|
||||
listen_url.host_str().unwrap(),
|
||||
listen_url.port().unwrap()
|
||||
);
|
||||
let listen_url = &self.env.control_plane_api;
|
||||
|
||||
(listen, listen_url.port().unwrap() + 1)
|
||||
}
|
||||
let scheme = listen_url.scheme();
|
||||
let host = listen_url.host_str().unwrap();
|
||||
|
||||
let (listen_port, postgres_port) = if let Some(base_port) = start_args.base_port {
|
||||
(
|
||||
base_port,
|
||||
self.config
|
||||
.database_url
|
||||
.expect("--base-port requires NeonStorageControllerConf::database_url")
|
||||
.port(),
|
||||
)
|
||||
} else {
|
||||
let port = listen_url.port().unwrap();
|
||||
(port, port + 1)
|
||||
};
|
||||
|
||||
let socket_addr = listen
|
||||
.parse()
|
||||
.expect("listen address is a valid socket address");
|
||||
self.listen
|
||||
.set(socket_addr)
|
||||
.expect("StorageController::listen is only set here");
|
||||
self.listen_port
|
||||
.set(listen_port)
|
||||
.expect("StorageController::listen_port is only set here");
|
||||
|
||||
// Do we remove the pid file on stop?
|
||||
let pg_started = self.is_postgres_running().await?;
|
||||
@@ -500,20 +507,15 @@ impl StorageController {
|
||||
drop(client);
|
||||
conn.await??;
|
||||
|
||||
let listen = self
|
||||
.listen
|
||||
.get()
|
||||
.expect("cell is set earlier in this function");
|
||||
let addr = format!("{}:{}", host, listen_port);
|
||||
let address_for_peers = Uri::builder()
|
||||
.scheme("http")
|
||||
.authority(format!("{}:{}", listen.ip(), listen.port()))
|
||||
.scheme(scheme)
|
||||
.authority(addr.clone())
|
||||
.path_and_query("")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let mut args = vec![
|
||||
"-l",
|
||||
&listen.to_string(),
|
||||
"--dev",
|
||||
"--database-url",
|
||||
&database_url,
|
||||
@@ -530,6 +532,14 @@ impl StorageController {
|
||||
.map(|s| s.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match scheme {
|
||||
"http" => args.extend(["--listen".to_string(), addr]),
|
||||
"https" => args.extend(["--listen-https".to_string(), addr]),
|
||||
_ => {
|
||||
panic!("Unexpected url scheme in control_plane_api: {scheme}");
|
||||
}
|
||||
}
|
||||
|
||||
if self.config.start_as_candidate {
|
||||
args.push("--start-as-candidate".to_string());
|
||||
}
|
||||
@@ -590,6 +600,8 @@ impl StorageController {
|
||||
args.push("--timelines-onto-safekeepers".to_string());
|
||||
}
|
||||
|
||||
println!("Starting storage controller");
|
||||
|
||||
background_process::start_process(
|
||||
COMMAND,
|
||||
&instance_dir,
|
||||
@@ -716,30 +728,26 @@ impl StorageController {
|
||||
{
|
||||
// In the special case of the `storage_controller start` subcommand, we wish
|
||||
// to use the API endpoint of the newly started storage controller in order
|
||||
// to pass the readiness check. In this scenario [`Self::listen`] will be set
|
||||
// (see [`Self::start`]).
|
||||
// to pass the readiness check. In this scenario [`Self::listen_port`] will
|
||||
// be set (see [`Self::start`]).
|
||||
//
|
||||
// Otherwise, we infer the storage controller api endpoint from the configured
|
||||
// control plane API.
|
||||
let url = if let Some(socket_addr) = self.listen.get() {
|
||||
Url::from_str(&format!(
|
||||
"http://{}:{}/{path}",
|
||||
socket_addr.ip().to_canonical(),
|
||||
socket_addr.port()
|
||||
))
|
||||
.unwrap()
|
||||
let port = if let Some(port) = self.listen_port.get() {
|
||||
*port
|
||||
} else {
|
||||
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
|
||||
// for general purpose API access.
|
||||
let listen_url = self.env.control_plane_api.clone();
|
||||
Url::from_str(&format!(
|
||||
"http://{}:{}/{path}",
|
||||
listen_url.host_str().unwrap(),
|
||||
listen_url.port().unwrap()
|
||||
))
|
||||
.unwrap()
|
||||
self.env.control_plane_api.port().unwrap()
|
||||
};
|
||||
|
||||
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
|
||||
// for general purpose API access.
|
||||
let url = Url::from_str(&format!(
|
||||
"{}://{}:{port}/{path}",
|
||||
self.env.control_plane_api.scheme(),
|
||||
self.env.control_plane_api.host_str().unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut builder = self.client.request(method, url);
|
||||
if let Some(body) = body {
|
||||
builder = builder.json(&body)
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
|
||||
pub fn load_cert_chain(filename: &Utf8Path) -> anyhow::Result<Vec<CertificateDer<'static>>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let file = std::fs::File::open(filename)
|
||||
.context(format!("Failed to open certificate file {filename:?}"))?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
Ok(rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?)
|
||||
}
|
||||
|
||||
pub fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let file = std::fs::File::open(filename)
|
||||
.context(format!("Failed to open private key file {filename:?}"))?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
let key = rustls_pemfile::private_key(&mut reader)?;
|
||||
|
||||
@@ -61,6 +61,7 @@ pub struct ConfigToml {
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
||||
pub availability_zone: Option<String>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wait_lsn_timeout: Duration,
|
||||
@@ -439,6 +440,7 @@ impl Default for ConfigToml {
|
||||
listen_https_addr: (None),
|
||||
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
|
||||
ssl_ca_file: None,
|
||||
availability_zone: (None),
|
||||
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
|
||||
@@ -234,6 +234,7 @@ fn initialize_config(
|
||||
.context("build toml deserializer")?,
|
||||
)
|
||||
.context("deserialize config toml")?;
|
||||
|
||||
let conf = PageServerConf::parse_and_validate(identity.id, config_toml, workdir)
|
||||
.context("runtime-validation of config toml")?;
|
||||
|
||||
@@ -427,7 +428,7 @@ fn start_pageserver(
|
||||
// Set up deletion queue
|
||||
let (deletion_queue, deletion_workers) = DeletionQueue::new(
|
||||
remote_storage.clone(),
|
||||
StorageControllerUpcallClient::new(conf, &shutdown_pageserver),
|
||||
StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?,
|
||||
conf,
|
||||
);
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
|
||||
@@ -17,7 +17,7 @@ use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
use reqwest::{Certificate, Url};
|
||||
use storage_broker::Uri;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::logging::{LogFormat, SecretString};
|
||||
@@ -43,7 +43,7 @@ use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, virtual_file};
|
||||
///
|
||||
/// For fields that require additional validation or filling in of defaults at runtime,
|
||||
/// check for examples in the [`PageServerConf::parse_and_validate`] method.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PageServerConf {
|
||||
// Identifier of that particular pageserver so e g safekeepers
|
||||
// can safely distinguish different pageservers
|
||||
@@ -58,6 +58,7 @@ pub struct PageServerConf {
|
||||
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_ca_cert: Option<Certificate>,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
@@ -325,6 +326,7 @@ impl PageServerConf {
|
||||
listen_https_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_ca_file,
|
||||
availability_zone,
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
@@ -469,6 +471,13 @@ impl PageServerConf {
|
||||
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
|
||||
load_previous_heatmap: load_previous_heatmap.unwrap_or(true),
|
||||
generate_unarchival_heatmap: generate_unarchival_heatmap.unwrap_or(true),
|
||||
ssl_ca_cert: match ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = std::fs::read(ssl_ca_file)?;
|
||||
Some(Certificate::from_pem(&buf)?)
|
||||
}
|
||||
None => None,
|
||||
},
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -50,10 +50,13 @@ pub trait StorageControllerUpcallApi {
|
||||
impl StorageControllerUpcallClient {
|
||||
/// A None return value indicates that the input `conf` object does not have control
|
||||
/// plane API enabled.
|
||||
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Option<Self>, reqwest::Error> {
|
||||
let mut url = match conf.control_plane_api.as_ref() {
|
||||
Some(u) => u.clone(),
|
||||
None => return None,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
if let Ok(mut segs) = url.path_segments_mut() {
|
||||
@@ -73,12 +76,16 @@ impl StorageControllerUpcallClient {
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
|
||||
Some(Self {
|
||||
http_client: client.build().expect("Failed to construct HTTP client"),
|
||||
if let Some(ssl_ca_cert) = &conf.ssl_ca_cert {
|
||||
client = client.add_root_certificate(ssl_ca_cert.clone());
|
||||
}
|
||||
|
||||
Ok(Some(Self {
|
||||
http_client: client.build()?,
|
||||
base_url: url,
|
||||
node_id: conf.id,
|
||||
cancel: cancel.clone(),
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
||||
@@ -344,7 +344,7 @@ async fn init_load_generations(
|
||||
"Emergency mode! Tenants will be attached unsafely using their last known generation"
|
||||
);
|
||||
emergency_generations(tenant_confs)
|
||||
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel) {
|
||||
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? {
|
||||
info!("Calling {} API to re-attach tenants", client.base_url());
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach(conf).await {
|
||||
|
||||
@@ -16,10 +16,11 @@ testing = []
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
cron.workspace = true
|
||||
clashmap.workspace = true
|
||||
cron.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
governor.workspace = true
|
||||
@@ -44,8 +45,9 @@ rustls-native-certs.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
measured.workspace = true
|
||||
rustls.workspace = true
|
||||
|
||||
@@ -4,7 +4,9 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::Parser;
|
||||
use futures::future::OptionFuture;
|
||||
use hyper0::Uri;
|
||||
use metrics::BuildInfo;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
@@ -39,13 +41,28 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
#[unsafe(export_name = "malloc_conf")]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
|
||||
|
||||
const DEFAULT_SSL_KEY_FILE: &str = "server.key";
|
||||
const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
#[clap(group(
|
||||
clap::ArgGroup::new("listen-addresses")
|
||||
.required(true)
|
||||
.multiple(true)
|
||||
.args(&["listen", "listen_https"]),
|
||||
))]
|
||||
struct Cli {
|
||||
/// Host and port to listen on, like `127.0.0.1:1234`
|
||||
/// Host and port to listen HTTP on, like `127.0.0.1:1234`.
|
||||
/// At least one of ["listen", "listen_https"] should be specified.
|
||||
// TODO: Make this option dev-only when https is out everywhere.
|
||||
#[arg(short, long)]
|
||||
listen: std::net::SocketAddr,
|
||||
listen: Option<std::net::SocketAddr>,
|
||||
/// Host and port to listen HTTPS on, like `127.0.0.1:1234`.
|
||||
/// At least one of ["listen", "listen_https"] should be specified.
|
||||
#[arg(long)]
|
||||
listen_https: Option<std::net::SocketAddr>,
|
||||
|
||||
/// Public key for JWT authentication of clients
|
||||
#[arg(long)]
|
||||
@@ -157,6 +174,12 @@ struct Cli {
|
||||
#[arg(long, default_value = "false")]
|
||||
use_https_safekeeper_api: bool,
|
||||
|
||||
/// Path to a file with certificate's private key for https API.
|
||||
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
|
||||
ssl_key_file: Utf8PathBuf,
|
||||
/// Path to a file with a X509 certificate for https API.
|
||||
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
|
||||
ssl_cert_file: Utf8PathBuf,
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
#[arg(long)]
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
@@ -283,11 +306,10 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let args = Cli::parse();
|
||||
tracing::info!(
|
||||
"version: {}, launch_timestamp: {}, build_tag {}, listening on {}",
|
||||
"version: {}, launch_timestamp: {}, build_tag {}",
|
||||
GIT_VERSION,
|
||||
launch_ts.to_string(),
|
||||
BUILD_TAG,
|
||||
args.listen
|
||||
);
|
||||
|
||||
let build_info = BuildInfo {
|
||||
@@ -378,7 +400,6 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
.unwrap_or(LONG_RECONCILE_THRESHOLD_DEFAULT),
|
||||
address_for_peers: args.address_for_peers,
|
||||
start_as_candidate: args.start_as_candidate,
|
||||
http_service_port: args.listen.port() as i32,
|
||||
use_https_pageserver_api: args.use_https_pageserver_api,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
ssl_ca_cert,
|
||||
@@ -392,28 +413,52 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
|
||||
let http_listener = tcp_listener::bind(args.listen)?;
|
||||
|
||||
let auth = secrets
|
||||
.public_key
|
||||
.map(|jwt_auth| Arc::new(SwappableJwtAuth::new(jwt_auth)));
|
||||
let router = make_router(service.clone(), auth, build_info)
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let router_service = http_utils::RouterService::new(router).unwrap();
|
||||
let http_service =
|
||||
Arc::new(http_utils::RequestServiceBuilder::new(router).map_err(|err| anyhow!(err))?);
|
||||
|
||||
let api_shutdown = CancellationToken::new();
|
||||
|
||||
// Start HTTP server
|
||||
let server_shutdown = CancellationToken::new();
|
||||
let server = hyper0::Server::from_tcp(http_listener)?
|
||||
.serve(router_service)
|
||||
.with_graceful_shutdown({
|
||||
let server_shutdown = server_shutdown.clone();
|
||||
async move {
|
||||
server_shutdown.cancelled().await;
|
||||
}
|
||||
});
|
||||
tracing::info!("Serving on {0}", args.listen);
|
||||
let server_task = tokio::task::spawn(server);
|
||||
let http_server_task: OptionFuture<_> = match args.listen {
|
||||
Some(http_addr) => {
|
||||
let http_listener = tcp_listener::bind(http_addr)?;
|
||||
let http_server =
|
||||
http_utils::server::Server::new(Arc::clone(&http_service), http_listener, None)?;
|
||||
|
||||
tracing::info!("Serving HTTP on {}", http_addr);
|
||||
Some(tokio::task::spawn(http_server.serve(api_shutdown.clone())))
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
.into();
|
||||
|
||||
// Start HTTPS server
|
||||
let https_server_task: OptionFuture<_> = match args.listen_https {
|
||||
Some(https_addr) => {
|
||||
let https_listener = tcp_listener::bind(https_addr)?;
|
||||
let certs = http_utils::tls_certs::load_cert_chain(args.ssl_cert_file.as_path())?;
|
||||
let key = http_utils::tls_certs::load_private_key(args.ssl_key_file.as_path())?;
|
||||
|
||||
let server_config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)?;
|
||||
|
||||
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
|
||||
let https_server =
|
||||
http_utils::server::Server::new(http_service, https_listener, Some(tls_acceptor))?;
|
||||
|
||||
tracing::info!("Serving HTTPS on {}", https_addr);
|
||||
Some(tokio::task::spawn(https_server.serve(api_shutdown.clone())))
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
.into();
|
||||
|
||||
let chaos_task = args.chaos_interval.map(|interval| {
|
||||
let service = service.clone();
|
||||
@@ -437,28 +482,41 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
|
||||
tokio::pin!(http_server_task, https_server_task);
|
||||
tokio::select! {
|
||||
_ = sigint.recv() => {},
|
||||
_ = sigterm.recv() => {},
|
||||
_ = sigquit.recv() => {},
|
||||
Some(err) = &mut http_server_task => {
|
||||
panic!("HTTP server task failed: {err:#?}");
|
||||
}
|
||||
Some(err) = &mut https_server_task => {
|
||||
panic!("HTTPS server task failed: {err:#?}");
|
||||
}
|
||||
}
|
||||
tracing::info!("Terminating on signal");
|
||||
|
||||
// Stop HTTP server first, so that we don't have to service requests
|
||||
// Stop HTTP and HTTPS servers first, so that we don't have to service requests
|
||||
// while shutting down Service.
|
||||
server_shutdown.cancel();
|
||||
match tokio::time::timeout(Duration::from_secs(5), server_task).await {
|
||||
Ok(Ok(_)) => {
|
||||
tracing::info!("Joined HTTP server task");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!("Error joining HTTP server task: {e}")
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!("Timed out joining HTTP server task");
|
||||
// We will fall through and shut down the service anyway, any request handlers
|
||||
// in flight will experience cancellation & their clients will see a torn connection.
|
||||
}
|
||||
api_shutdown.cancel();
|
||||
|
||||
// If the deadline is exceeded, we will fall through and shut down the service anyway,
|
||||
// any request handlers in flight will experience cancellation & their clients will
|
||||
// see a torn connection.
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
|
||||
|
||||
match tokio::time::timeout_at(deadline, http_server_task).await {
|
||||
Ok(Some(Ok(_))) => tracing::info!("Joined HTTP server task"),
|
||||
Ok(Some(Err(e))) => tracing::error!("Error joining HTTP server task: {e}"),
|
||||
Ok(None) => {} // HTTP is disabled.
|
||||
Err(_) => tracing::warn!("Timed out joining HTTP server task"),
|
||||
}
|
||||
|
||||
match tokio::time::timeout_at(deadline, https_server_task).await {
|
||||
Ok(Some(Ok(_))) => tracing::info!("Joined HTTPS server task"),
|
||||
Ok(Some(Err(e))) => tracing::error!("Error joining HTTPS server task: {e}"),
|
||||
Ok(None) => {} // HTTPS is disabled.
|
||||
Err(_) => tracing::warn!("Timed out joining HTTPS server task"),
|
||||
}
|
||||
|
||||
// If we were injecting chaos, stop that so that we're not calling into Service while it shuts down
|
||||
|
||||
@@ -412,8 +412,6 @@ pub struct Config {
|
||||
|
||||
pub start_as_candidate: bool,
|
||||
|
||||
pub http_service_port: i32,
|
||||
|
||||
pub long_reconcile_threshold: Duration,
|
||||
|
||||
pub use_https_pageserver_api: bool,
|
||||
|
||||
@@ -469,6 +469,9 @@ class NeonEnvBuilder:
|
||||
# Flag to enable https listener in safekeeper, generate local ssl certs,
|
||||
# and force storage controller to use https for safekeeper api.
|
||||
self.use_https_safekeeper_api: bool = False
|
||||
# Flag to use https listener in storage controller, generate local ssl certs,
|
||||
# and force pageservers and neon_local to use https for storage controller api.
|
||||
self.use_https_storage_controller_api: bool = False
|
||||
|
||||
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
|
||||
self.pageserver_get_vectored_concurrent_io: str | None = (
|
||||
@@ -1067,7 +1070,9 @@ class NeonEnv:
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.generate_local_ssl_certs = (
|
||||
config.use_https_pageserver_api or config.use_https_safekeeper_api
|
||||
config.use_https_pageserver_api
|
||||
or config.use_https_safekeeper_api
|
||||
or config.use_https_storage_controller_api
|
||||
)
|
||||
self.ssl_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
|
||||
@@ -1096,7 +1101,10 @@ class NeonEnv:
|
||||
|
||||
self.storage_controller_port = config.storage_controller_port_override
|
||||
self.storage_controller = NeonProxiedStorageController(
|
||||
self, config.storage_controller_port_override, config.auth_enabled
|
||||
self,
|
||||
config.storage_controller_port_override,
|
||||
config.auth_enabled,
|
||||
config.use_https_storage_controller_api,
|
||||
)
|
||||
else:
|
||||
# Find two adjacent ports for storage controller and its postgres DB. This
|
||||
@@ -1110,7 +1118,10 @@ class NeonEnv:
|
||||
|
||||
self.storage_controller_port = storage_controller_port
|
||||
self.storage_controller = NeonStorageController(
|
||||
self, storage_controller_port, config.auth_enabled
|
||||
self,
|
||||
storage_controller_port,
|
||||
config.auth_enabled,
|
||||
config.use_https_storage_controller_api,
|
||||
)
|
||||
|
||||
log.info(
|
||||
@@ -1777,14 +1788,16 @@ class StorageControllerMigrationConfig:
|
||||
|
||||
|
||||
class NeonStorageController(MetricsGetter, LogUtils):
|
||||
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
|
||||
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool, use_https: bool):
|
||||
self.env = env
|
||||
self.port: int = port
|
||||
self.api: str = f"http://127.0.0.1:{port}"
|
||||
scheme = "https" if use_https else "http"
|
||||
self.api: str = f"{scheme}://127.0.0.1:{port}"
|
||||
self.running = False
|
||||
self.auth_enabled = auth_enabled
|
||||
self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS
|
||||
self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log"
|
||||
self.ssl_ca_file = env.ssl_ca_file
|
||||
|
||||
def start(
|
||||
self,
|
||||
@@ -1854,6 +1867,8 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
return PageserverHttpClient(self.port, lambda: True, auth_token, *args, **kwargs)
|
||||
|
||||
def request(self, method, *args, **kwargs) -> requests.Response:
|
||||
if self.ssl_ca_file is not None:
|
||||
kwargs["verify"] = self.ssl_ca_file
|
||||
resp = requests.request(method, *args, **kwargs)
|
||||
NeonStorageController.raise_api_exception(resp)
|
||||
|
||||
@@ -2565,8 +2580,8 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
|
||||
|
||||
class NeonProxiedStorageController(NeonStorageController):
|
||||
def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool):
|
||||
super().__init__(env, proxy_port, auth_enabled)
|
||||
def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool, use_https: bool):
|
||||
super().__init__(env, proxy_port, auth_enabled, use_https)
|
||||
self.instances: dict[int, dict[str, Any]] = {}
|
||||
|
||||
def start(
|
||||
|
||||
@@ -66,3 +66,16 @@ def test_safekeeper_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
wait_until(storcon_heartbeat)
|
||||
|
||||
|
||||
def test_storage_controller_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test HTTPS storage controller API.
|
||||
If NeonEnv starts with use_https_storage_controller_api with no errors, it's already a success.
|
||||
Make /status request to HTTPS API to ensure it's appropriately configured.
|
||||
"""
|
||||
neon_env_builder.use_https_storage_controller_api = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
addr = f"https://localhost:{env.storage_controller.port}/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
|
||||
Reference in New Issue
Block a user