mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pageserver: https for management API (#11025)
## Problem Storage controller uses unencrypted HTTP requests for pageserver management API. Closes: https://github.com/neondatabase/cloud/issues/24283 ## Summary of changes - Implement `http_utils::server::Server` with TLS support. - Replace `hyper0::server::Server` with `http_utils::server::Server` in pageserver. - Add HTTPS handler for pageserver management API. - Generate local SSL certificates in neon local.
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -2848,6 +2848,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"fail",
|
||||
"futures",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"jemalloc_pprof",
|
||||
@@ -2861,6 +2862,7 @@ dependencies = [
|
||||
"serde_path_to_error",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
@@ -4189,6 +4191,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
@@ -4278,6 +4281,9 @@ dependencies = [
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rpds",
|
||||
"rustls 0.23.18",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"rustls-pki-types",
|
||||
"scopeguard",
|
||||
"send-future",
|
||||
"serde",
|
||||
@@ -4296,6 +4302,7 @@ dependencies = [
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
@@ -5908,9 +5915,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.10.0"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b"
|
||||
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
|
||||
@@ -155,6 +155,7 @@ rpds = "0.13"
|
||||
rustc-hash = "1.1.0"
|
||||
rustls = { version = "0.23.16", default-features = false }
|
||||
rustls-pemfile = "2"
|
||||
rustls-pki-types = "1.11"
|
||||
scopeguard = "1.1"
|
||||
sysinfo = "0.29.2"
|
||||
sd-notify = "0.4.1"
|
||||
|
||||
@@ -963,6 +963,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
id: pageserver_id,
|
||||
listen_pg_addr: format!("127.0.0.1:{pg_port}"),
|
||||
listen_http_addr: format!("127.0.0.1:{http_port}"),
|
||||
listen_https_addr: None,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
other: Default::default(),
|
||||
@@ -977,6 +978,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
|
||||
storage_controller: None,
|
||||
control_plane_compute_hook_api: None,
|
||||
generate_local_ssl_certs: false,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -81,6 +81,10 @@ pub struct LocalEnv {
|
||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
||||
pub branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
|
||||
/// Flag to generate SSL certificates for components that need it.
|
||||
/// Also generates root CA certificate that is used to sign all other certificates.
|
||||
pub generate_local_ssl_certs: bool,
|
||||
}
|
||||
|
||||
/// On-disk state stored in `.neon/config`.
|
||||
@@ -102,6 +106,10 @@ pub struct OnDiskConfig {
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
// Note: skip serializing because in compat tests old storage controller fails
|
||||
// to load new config file. May be removed after this field is in release branch.
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub generate_local_ssl_certs: bool,
|
||||
}
|
||||
|
||||
fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
|
||||
@@ -129,6 +137,7 @@ pub struct NeonLocalInitConf {
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Option<Url>>,
|
||||
pub generate_local_ssl_certs: bool,
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
@@ -165,6 +174,9 @@ pub struct NeonStorageControllerConf {
|
||||
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub long_reconcile_threshold: Option<Duration>,
|
||||
|
||||
#[serde(default)]
|
||||
pub use_https_pageserver_api: bool,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -188,6 +200,7 @@ impl Default for NeonStorageControllerConf {
|
||||
max_secondary_lag_bytes: None,
|
||||
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
|
||||
long_reconcile_threshold: None,
|
||||
use_https_pageserver_api: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -217,6 +230,7 @@ pub struct PageServerConf {
|
||||
pub id: NodeId,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub no_sync: bool,
|
||||
@@ -228,6 +242,7 @@ impl Default for PageServerConf {
|
||||
id: NodeId(0),
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
listen_https_addr: None,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
no_sync: false,
|
||||
@@ -243,6 +258,7 @@ pub struct NeonLocalInitPageserverConf {
|
||||
pub id: NodeId,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
@@ -257,6 +273,7 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
id,
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
no_sync,
|
||||
@@ -266,6 +283,7 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
id: *id,
|
||||
listen_pg_addr: listen_pg_addr.clone(),
|
||||
listen_http_addr: listen_http_addr.clone(),
|
||||
listen_https_addr: listen_https_addr.clone(),
|
||||
pg_auth_type: *pg_auth_type,
|
||||
http_auth_type: *http_auth_type,
|
||||
no_sync: *no_sync,
|
||||
@@ -410,6 +428,41 @@ impl LocalEnv {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ssl_ca_cert_path(&self) -> Option<PathBuf> {
|
||||
if self.generate_local_ssl_certs {
|
||||
Some(self.base_data_dir.join("rootCA.crt"))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ssl_ca_key_path(&self) -> Option<PathBuf> {
|
||||
if self.generate_local_ssl_certs {
|
||||
Some(self.base_data_dir.join("rootCA.key"))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_ssl_ca_cert(&self) -> anyhow::Result<()> {
|
||||
let cert_path = self.ssl_ca_cert_path().unwrap();
|
||||
let key_path = self.ssl_ca_key_path().unwrap();
|
||||
if !fs::exists(cert_path.as_path())? {
|
||||
generate_ssl_ca_cert(cert_path.as_path(), key_path.as_path())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn generate_ssl_cert(&self, cert_path: &Path, key_path: &Path) -> anyhow::Result<()> {
|
||||
self.generate_ssl_ca_cert()?;
|
||||
generate_ssl_cert(
|
||||
cert_path,
|
||||
key_path,
|
||||
self.ssl_ca_cert_path().unwrap().as_path(),
|
||||
self.ssl_ca_key_path().unwrap().as_path(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Inspect the base data directory and extract the instance id and instance directory path
|
||||
/// for all storage controller instances
|
||||
pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
|
||||
@@ -519,6 +572,7 @@ impl LocalEnv {
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
} = on_disk_config;
|
||||
LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
@@ -533,6 +587,7 @@ impl LocalEnv {
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
}
|
||||
};
|
||||
|
||||
@@ -568,6 +623,7 @@ impl LocalEnv {
|
||||
struct PageserverConfigTomlSubset {
|
||||
listen_pg_addr: String,
|
||||
listen_http_addr: String,
|
||||
listen_https_addr: Option<String>,
|
||||
pg_auth_type: AuthType,
|
||||
http_auth_type: AuthType,
|
||||
#[serde(default)]
|
||||
@@ -592,6 +648,7 @@ impl LocalEnv {
|
||||
let PageserverConfigTomlSubset {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
no_sync,
|
||||
@@ -609,6 +666,7 @@ impl LocalEnv {
|
||||
},
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
no_sync,
|
||||
@@ -636,6 +694,7 @@ impl LocalEnv {
|
||||
control_plane_api: Some(self.control_plane_api.clone()),
|
||||
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
generate_local_ssl_certs: self.generate_local_ssl_certs,
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -718,6 +777,7 @@ impl LocalEnv {
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
generate_local_ssl_certs,
|
||||
} = conf;
|
||||
|
||||
// Find postgres binaries.
|
||||
@@ -766,8 +826,13 @@ impl LocalEnv {
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
|
||||
branch_name_mappings: Default::default(),
|
||||
generate_local_ssl_certs,
|
||||
};
|
||||
|
||||
if generate_local_ssl_certs {
|
||||
env.generate_ssl_ca_cert()?;
|
||||
}
|
||||
|
||||
// create endpoints dir
|
||||
fs::create_dir_all(env.endpoints_path())?;
|
||||
|
||||
@@ -851,3 +916,80 @@ fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn generate_ssl_ca_cert(cert_path: &Path, key_path: &Path) -> anyhow::Result<()> {
|
||||
// openssl req -x509 -newkey rsa:2048 -nodes -subj "/CN=Neon Local CA" -days 36500 \
|
||||
// -out rootCA.crt -keyout rootCA.key
|
||||
let keygen_output = Command::new("openssl")
|
||||
.args([
|
||||
"req", "-x509", "-newkey", "rsa:2048", "-nodes", "-days", "36500",
|
||||
])
|
||||
.args(["-subj", "/CN=Neon Local CA"])
|
||||
.args(["-out", cert_path.to_str().unwrap()])
|
||||
.args(["-keyout", key_path.to_str().unwrap()])
|
||||
.output()
|
||||
.context("failed to generate CA certificate")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn generate_ssl_cert(
|
||||
cert_path: &Path,
|
||||
key_path: &Path,
|
||||
ca_cert_path: &Path,
|
||||
ca_key_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
// Generate Certificate Signing Request (CSR).
|
||||
let mut csr_path = cert_path.to_path_buf();
|
||||
csr_path.set_extension(".csr");
|
||||
|
||||
// openssl req -new -nodes -newkey rsa:2048 -keyout server.key -out server.csr \
|
||||
// -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
|
||||
let keygen_output = Command::new("openssl")
|
||||
.args(["req", "-new", "-nodes"])
|
||||
.args(["-newkey", "rsa:2048"])
|
||||
.args(["-subj", "/CN=localhost"])
|
||||
.args(["-addext", "subjectAltName=DNS:localhost,IP:127.0.0.1"])
|
||||
.args(["-keyout", key_path.to_str().unwrap()])
|
||||
.args(["-out", csr_path.to_str().unwrap()])
|
||||
.output()
|
||||
.context("failed to generate CSR")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
// Sign CSR with CA key.
|
||||
//
|
||||
// openssl x509 -req -in server.csr -CA rootCA.crt -CAkey rootCA.key -CAcreateserial \
|
||||
// -out server.crt -days 36500 -copy_extensions copyall
|
||||
let keygen_output = Command::new("openssl")
|
||||
.args(["x509", "-req"])
|
||||
.args(["-in", csr_path.to_str().unwrap()])
|
||||
.args(["-CA", ca_cert_path.to_str().unwrap()])
|
||||
.args(["-CAkey", ca_key_path.to_str().unwrap()])
|
||||
.arg("-CAcreateserial")
|
||||
.args(["-out", cert_path.to_str().unwrap()])
|
||||
.args(["-days", "36500"])
|
||||
.args(["-copy_extensions", "copyall"])
|
||||
.output()
|
||||
.context("failed to sign CSR")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
// Remove CSR file as it's not needed anymore.
|
||||
fs::remove_file(csr_path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::{PgConnectionConfig, parse_host_port};
|
||||
use reqwest::Certificate;
|
||||
use utils::auth::{Claims, Scope};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -49,12 +50,29 @@ impl PageServerNode {
|
||||
let (host, port) =
|
||||
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let port = port.unwrap_or(5432);
|
||||
|
||||
let ssl_ca_cert = env.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
|
||||
Certificate::from_pem(&buf).expect("CA certificate should be valid")
|
||||
});
|
||||
|
||||
let endpoint = if env.storage_controller.use_https_pageserver_api {
|
||||
format!(
|
||||
"https://{}",
|
||||
conf.listen_https_addr.as_ref().expect(
|
||||
"listen https address should be specified if use_https_pageserver_api is on"
|
||||
)
|
||||
)
|
||||
} else {
|
||||
format!("http://{}", conf.listen_http_addr)
|
||||
};
|
||||
|
||||
Self {
|
||||
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
|
||||
conf: conf.clone(),
|
||||
env: env.clone(),
|
||||
http_client: mgmt_api::Client::new(
|
||||
format!("http://{}", conf.listen_http_addr),
|
||||
endpoint,
|
||||
{
|
||||
match conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
@@ -65,7 +83,9 @@ impl PageServerNode {
|
||||
}
|
||||
}
|
||||
.as_deref(),
|
||||
),
|
||||
ssl_ca_cert,
|
||||
)
|
||||
.expect("Client constructs with no errors"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,6 +240,13 @@ impl PageServerNode {
|
||||
.context("write identity toml")?;
|
||||
drop(identity_toml);
|
||||
|
||||
if self.env.generate_local_ssl_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
datadir.join("server.crt").as_path(),
|
||||
datadir.join("server.key").as_path(),
|
||||
)?;
|
||||
}
|
||||
|
||||
// TODO: invoke a TBD config-check command to validate that pageserver will start with the written config
|
||||
|
||||
// Write metadata file, used by pageserver on startup to register itself with
|
||||
@@ -230,6 +257,15 @@ impl PageServerNode {
|
||||
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
|
||||
let http_port = http_port.unwrap_or(9898);
|
||||
|
||||
let https_port = match self.conf.listen_https_addr.as_ref() {
|
||||
Some(https_addr) => {
|
||||
let (_https_host, https_port) =
|
||||
parse_host_port(https_addr).expect("Unable to parse listen_https_addr");
|
||||
Some(https_port.unwrap_or(9899))
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Intentionally hand-craft JSON: this acts as an implicit format compat test
|
||||
// in case the pageserver-side structure is edited, and reflects the real life
|
||||
// situation: the metadata is written by some other script.
|
||||
@@ -240,6 +276,7 @@ impl PageServerNode {
|
||||
postgres_port: self.pg_connection_config.port(),
|
||||
http_host: "localhost".to_string(),
|
||||
http_port,
|
||||
https_port,
|
||||
other: HashMap::from([(
|
||||
"availability_zone_id".to_string(),
|
||||
serde_json::json!(az_id),
|
||||
|
||||
@@ -534,6 +534,14 @@ impl StorageController {
|
||||
args.push("--start-as-candidate".to_string());
|
||||
}
|
||||
|
||||
if self.config.use_https_pageserver_api {
|
||||
args.push("--use-https-pageserver-api".to_string());
|
||||
}
|
||||
|
||||
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
|
||||
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
|
||||
}
|
||||
|
||||
if let Some(private_key) = &self.private_key {
|
||||
let claims = Claims::new(None, Scope::PageServerApi);
|
||||
let jwt_token =
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -278,6 +279,10 @@ struct Cli {
|
||||
/// a token with both scopes to use with this tool.
|
||||
jwt: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// Trusted root CA certificate to use in https APIs.
|
||||
ssl_ca_file: Option<PathBuf>,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
@@ -388,9 +393,17 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let ssl_ca_cert = match &cli.ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Some(reqwest::Certificate::from_pem(&buf)?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
|
||||
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref(), ssl_ca_cert)?;
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hyper0.workspace = true
|
||||
itertools.workspace = true
|
||||
jemalloc_pprof.workspace = true
|
||||
@@ -21,6 +22,7 @@ serde_path_to_error.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
@@ -3,9 +3,10 @@ pub mod error;
|
||||
pub mod failpoints;
|
||||
pub mod json;
|
||||
pub mod request;
|
||||
pub mod server;
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
|
||||
/// Current fast way to apply simple http routing in various Neon binaries.
|
||||
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
|
||||
pub use routerify::{RouterBuilder, RouterService, ext::RequestExt};
|
||||
pub use routerify::{RequestServiceBuilder, RouterBuilder, RouterService, ext::RequestExt};
|
||||
|
||||
155
libs/http-utils/src/server.rs
Normal file
155
libs/http-utils/src/server.rs
Normal file
@@ -0,0 +1,155 @@
|
||||
use std::{error::Error, sync::Arc};
|
||||
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use hyper0::Body;
|
||||
use hyper0::server::conn::Http;
|
||||
use routerify::{RequestService, RequestServiceBuilder};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::error::ApiError;
|
||||
|
||||
/// A simple HTTP server over hyper library.
|
||||
/// You may want to use it instead of [`hyper0::server::Server`] because:
|
||||
/// 1. hyper0's Server was removed from hyper v1.
|
||||
/// It's recommended to replace hyepr0's Server with a manual loop, which is done here.
|
||||
/// 2. hyper0's Server doesn't support TLS out of the box, and there is no way
|
||||
/// to support it efficiently with the Accept trait that hyper0's Server uses.
|
||||
/// That's one of the reasons why it was removed from v1.
|
||||
/// <https://github.com/hyperium/hyper/blob/115339d3df50f20c8717680aa35f48858e9a6205/docs/ROADMAP.md#higher-level-client-and-server-problems>
|
||||
pub struct Server {
|
||||
request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
|
||||
listener: tokio::net::TcpListener,
|
||||
tls_acceptor: Option<TlsAcceptor>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new(
|
||||
request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
|
||||
listener: std::net::TcpListener,
|
||||
tls_acceptor: Option<TlsAcceptor>,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Note: caller of from_std is responsible for setting nonblocking mode.
|
||||
listener.set_nonblocking(true)?;
|
||||
let listener = tokio::net::TcpListener::from_std(listener)?;
|
||||
|
||||
Ok(Self {
|
||||
request_service,
|
||||
listener,
|
||||
tls_acceptor,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn serve(self, cancel: CancellationToken) -> anyhow::Result<()> {
|
||||
fn suppress_io_error(err: &std::io::Error) -> bool {
|
||||
use std::io::ErrorKind::*;
|
||||
matches!(err.kind(), ConnectionReset | ConnectionAborted | BrokenPipe)
|
||||
}
|
||||
fn suppress_hyper_error(err: &hyper0::Error) -> bool {
|
||||
if err.is_incomplete_message() || err.is_closed() || err.is_timeout() {
|
||||
return true;
|
||||
}
|
||||
if let Some(inner) = err.source() {
|
||||
if let Some(io) = inner.downcast_ref::<std::io::Error>() {
|
||||
return suppress_io_error(io);
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
let mut connections = FuturesUnordered::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
stream = self.listener.accept() => {
|
||||
let (tcp_stream, remote_addr) = match stream {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
if !suppress_io_error(&err) {
|
||||
info!("Failed to accept TCP connection: {err:#}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let service = self.request_service.build(remote_addr);
|
||||
let tls_acceptor = self.tls_acceptor.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
connections.push(tokio::spawn(
|
||||
async move {
|
||||
match tls_acceptor {
|
||||
Some(tls_acceptor) => {
|
||||
// Handle HTTPS connection.
|
||||
let tls_stream = tokio::select! {
|
||||
tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream,
|
||||
_ = cancel.cancelled() => return,
|
||||
};
|
||||
let tls_stream = match tls_stream {
|
||||
Ok(tls_stream) => tls_stream,
|
||||
Err(err) => {
|
||||
if !suppress_io_error(&err) {
|
||||
info!("Failed to accept TLS connection: {err:#}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTPS connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Handle HTTP connection.
|
||||
if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
|
||||
if !suppress_hyper_error(&err) {
|
||||
info!("Failed to serve HTTP connection: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}));
|
||||
}
|
||||
Some(conn) = connections.next() => {
|
||||
if let Err(err) = conn {
|
||||
error!("Connection panicked: {err:#}");
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
// Wait for graceful shutdown of all connections.
|
||||
while let Some(conn) = connections.next().await {
|
||||
if let Err(err) = conn {
|
||||
error!("Connection panicked: {err:#}");
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serves HTTP connection with graceful shutdown.
|
||||
async fn serve_connection<I>(
|
||||
io: I,
|
||||
service: RequestService<Body, ApiError>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), hyper0::Error>
|
||||
where
|
||||
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let mut conn = Http::new().serve_connection(io, service).with_upgrades();
|
||||
|
||||
tokio::select! {
|
||||
res = &mut conn => res,
|
||||
_ = cancel.cancelled() => {
|
||||
Pin::new(&mut conn).graceful_shutdown();
|
||||
// Note: connection should still be awaited for graceful shutdown to complete.
|
||||
conn.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,6 +35,7 @@ pub struct NodeMetadata {
|
||||
pub postgres_port: u16,
|
||||
pub http_host: String,
|
||||
pub http_port: u16,
|
||||
pub https_port: Option<u16>,
|
||||
|
||||
// Deployment tools may write fields to the metadata file beyond what we
|
||||
// use in this type: this type intentionally only names fields that require.
|
||||
@@ -57,6 +58,9 @@ pub struct ConfigToml {
|
||||
// types mapped 1:1 into the runtime PageServerConfig type
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub availability_zone: Option<String>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wait_lsn_timeout: Duration,
|
||||
@@ -421,6 +425,9 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Vanilla;
|
||||
|
||||
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
|
||||
pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
@@ -430,6 +437,9 @@ impl Default for ConfigToml {
|
||||
Self {
|
||||
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
listen_https_addr: (None),
|
||||
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
|
||||
availability_zone: (None),
|
||||
wait_lsn_timeout: (humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
|
||||
@@ -16,6 +16,30 @@ fn test_node_metadata_v1_backward_compatibilty() {
|
||||
postgres_port: 23,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: None,
|
||||
other: HashMap::new(),
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_node_metadata_v2_backward_compatibilty() {
|
||||
let v2 = serde_json::to_vec(&serde_json::json!({
|
||||
"host": "localhost",
|
||||
"port": 23,
|
||||
"http_host": "localhost",
|
||||
"http_port": 42,
|
||||
"https_port": 123,
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
serde_json::from_slice::<NodeMetadata>(&v2.unwrap()).unwrap(),
|
||||
NodeMetadata {
|
||||
postgres_host: "localhost".to_string(),
|
||||
postgres_port: 23,
|
||||
http_host: "localhost".to_string(),
|
||||
http_port: 42,
|
||||
https_port: Some(123),
|
||||
other: HashMap::new(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -48,6 +48,9 @@ pprof.workspace = true
|
||||
rand.workspace = true
|
||||
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
|
||||
regex.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
rustls-pki-types.workspace = true
|
||||
rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
send-future.workspace = true
|
||||
serde.workspace = true
|
||||
@@ -62,6 +65,7 @@ tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util"
|
||||
tokio-epoll-uring.workspace = true
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
|
||||
@@ -7,7 +7,7 @@ use http_utils::error::HttpErrorBody;
|
||||
use pageserver_api::models::*;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
pub use reqwest::Body as ReqwestBody;
|
||||
use reqwest::{IntoUrl, Method, StatusCode};
|
||||
use reqwest::{Certificate, IntoUrl, Method, StatusCode};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -38,6 +38,9 @@ pub enum Error {
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error("create client: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
|
||||
CreateClient(reqwest::Error),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -69,8 +72,17 @@ pub enum ForceAwaitLogicalSize {
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
|
||||
Self::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt)
|
||||
pub fn new(
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<Self> {
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert);
|
||||
}
|
||||
let http_client = http_client.build().map_err(Error::CreateClient)?;
|
||||
Ok(Self::from_client(http_client, mgmt_api_endpoint, jwt))
|
||||
}
|
||||
|
||||
pub fn from_client(
|
||||
@@ -101,12 +113,10 @@ impl Client {
|
||||
debug_assert!(path.starts_with('/'));
|
||||
let uri = format!("{}{}", self.mgmt_api_endpoint, path);
|
||||
|
||||
let req = self.client.request(Method::GET, uri);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
let mut req = self.client.request(Method::GET, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -36,7 +36,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
|
||||
@@ -77,7 +77,8 @@ async fn main_impl(
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
|
||||
@@ -125,7 +125,8 @@ async fn main_impl(
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
|
||||
if let Some(engine_str) = &args.set_io_engine {
|
||||
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||
|
||||
@@ -83,7 +83,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
|
||||
if let Some(engine_str) = &args.set_io_engine {
|
||||
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||
|
||||
@@ -40,7 +40,8 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
None, // TODO: support ssl_ca_file for https APIs in pagebench.
|
||||
)?);
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
|
||||
@@ -25,11 +25,12 @@ use pageserver::task_mgr::{
|
||||
};
|
||||
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
|
||||
use pageserver::{
|
||||
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, http, page_cache, page_service,
|
||||
task_mgr, virtual_file,
|
||||
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http,
|
||||
page_cache, page_service, task_mgr, virtual_file,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -343,8 +344,15 @@ fn start_pageserver(
|
||||
info!("Starting pageserver http handler on {http_addr}");
|
||||
let http_listener = tcp_listener::bind(http_addr)?;
|
||||
|
||||
let pg_addr = &conf.listen_pg_addr;
|
||||
let https_listener = match conf.listen_https_addr.as_ref() {
|
||||
Some(https_addr) => {
|
||||
info!("Starting pageserver https handler on {https_addr}");
|
||||
Some(tcp_listener::bind(https_addr)?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let pg_addr = &conf.listen_pg_addr;
|
||||
info!("Starting pageserver pg protocol handler on {pg_addr}");
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
|
||||
@@ -575,9 +583,8 @@ fn start_pageserver(
|
||||
|
||||
// Start up the service to handle HTTP mgmt API request. We created the
|
||||
// listener earlier already.
|
||||
let http_endpoint_listener = {
|
||||
let (http_endpoint_listener, https_endpoint_listener) = {
|
||||
let _rt_guard = MGMT_REQUEST_RUNTIME.enter(); // for hyper
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let router_state = Arc::new(
|
||||
http::routes::State::new(
|
||||
@@ -592,22 +599,51 @@ fn start_pageserver(
|
||||
)
|
||||
.context("Failed to initialize router state")?,
|
||||
);
|
||||
|
||||
let router = http::make_router(router_state, launch_ts, http_auth.clone())?
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = http_utils::RouterService::new(router).unwrap();
|
||||
let server = hyper0::Server::from_tcp(http_listener)?
|
||||
.serve(service)
|
||||
.with_graceful_shutdown({
|
||||
let cancel = cancel.clone();
|
||||
async move { cancel.clone().cancelled().await }
|
||||
});
|
||||
|
||||
let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
"http endpoint listener",
|
||||
server,
|
||||
));
|
||||
HttpEndpointListener(CancellableTask { task, cancel })
|
||||
let service =
|
||||
Arc::new(http_utils::RequestServiceBuilder::new(router).map_err(|err| anyhow!(err))?);
|
||||
|
||||
let http_task = {
|
||||
let server =
|
||||
http_utils::server::Server::new(Arc::clone(&service), http_listener, None)?;
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
"http endpoint listener",
|
||||
server.serve(cancel.clone()),
|
||||
));
|
||||
HttpEndpointListener(CancellableTask { task, cancel })
|
||||
};
|
||||
|
||||
let https_task = match https_listener {
|
||||
Some(https_listener) => {
|
||||
let certs = load_certs(&conf.ssl_cert_file)?;
|
||||
let key = load_private_key(&conf.ssl_key_file)?;
|
||||
|
||||
let server_config = rustls::ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)?;
|
||||
|
||||
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
|
||||
|
||||
let server =
|
||||
http_utils::server::Server::new(service, https_listener, Some(tls_acceptor))?;
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let task = MGMT_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
"https endpoint listener",
|
||||
server.serve(cancel.clone()),
|
||||
));
|
||||
Some(HttpsEndpointListener(CancellableTask { task, cancel }))
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
(http_task, https_task)
|
||||
};
|
||||
|
||||
let consumption_metrics_tasks = {
|
||||
@@ -683,6 +719,7 @@ fn start_pageserver(
|
||||
shutdown_pageserver.cancel();
|
||||
pageserver::shutdown_pageserver(
|
||||
http_endpoint_listener,
|
||||
https_endpoint_listener,
|
||||
page_service,
|
||||
consumption_metrics_tasks,
|
||||
disk_usage_eviction_task,
|
||||
@@ -697,6 +734,25 @@ fn start_pageserver(
|
||||
})
|
||||
}
|
||||
|
||||
fn load_certs(filename: &Utf8Path) -> std::io::Result<Vec<CertificateDer<'static>>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
rustls_pemfile::certs(&mut reader).collect()
|
||||
}
|
||||
|
||||
fn load_private_key(filename: &Utf8Path) -> anyhow::Result<PrivateKeyDer<'static>> {
|
||||
let file = std::fs::File::open(filename)?;
|
||||
let mut reader = std::io::BufReader::new(file);
|
||||
|
||||
let key = rustls_pemfile::private_key(&mut reader)?;
|
||||
|
||||
key.ok_or(anyhow::anyhow!(
|
||||
"no private key found in {}",
|
||||
filename.as_str(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn create_remote_storage_client(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
|
||||
@@ -53,6 +53,11 @@ pub struct PageServerConf {
|
||||
pub listen_pg_addr: String,
|
||||
/// Example (default): 127.0.0.1:9898
|
||||
pub listen_http_addr: String,
|
||||
/// Example: 127.0.0.1:9899
|
||||
pub listen_https_addr: Option<String>,
|
||||
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
@@ -317,6 +322,9 @@ impl PageServerConf {
|
||||
let pageserver_api::config::ConfigToml {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
availability_zone,
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
@@ -375,6 +383,9 @@ impl PageServerConf {
|
||||
// ------------------------------------------------------------
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
availability_zone,
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
|
||||
@@ -181,7 +181,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
listen_https_port: None, // TODO: Support https.
|
||||
listen_https_port: m.https_port,
|
||||
availability_zone_id: az_id.expect("Checked above"),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ pub struct CancellableTask {
|
||||
pub cancel: CancellationToken,
|
||||
}
|
||||
pub struct HttpEndpointListener(pub CancellableTask);
|
||||
pub struct HttpsEndpointListener(pub CancellableTask);
|
||||
pub struct ConsumptionMetricsTasks(pub CancellableTask);
|
||||
pub struct DiskUsageEvictionTask(pub CancellableTask);
|
||||
impl CancellableTask {
|
||||
@@ -77,6 +78,7 @@ impl CancellableTask {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn shutdown_pageserver(
|
||||
http_listener: HttpEndpointListener,
|
||||
https_listener: Option<HttpsEndpointListener>,
|
||||
page_service: page_service::Listener,
|
||||
consumption_metrics_worker: ConsumptionMetricsTasks,
|
||||
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
|
||||
@@ -213,6 +215,15 @@ pub async fn shutdown_pageserver(
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(https_listener) = https_listener {
|
||||
timed(
|
||||
https_listener.0.shutdown(),
|
||||
"shutdown https",
|
||||
Duration::from_secs(1),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Shut down the HTTP endpoint last, so that you can still check the server's
|
||||
// status while it's shutting down.
|
||||
// FIXME: We should probably stop accepting commands like attach/detach earlier.
|
||||
|
||||
@@ -178,6 +178,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
let mut heartbeat_futs = FuturesUnordered::new();
|
||||
for (node_id, node) in &*pageservers {
|
||||
heartbeat_futs.push({
|
||||
let ssl_ca_cert = self.ssl_ca_cert.clone();
|
||||
let jwt_token = self.jwt_token.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
@@ -193,6 +194,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_utilization().await },
|
||||
&jwt_token,
|
||||
&ssl_ca_cert,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
|
||||
@@ -657,7 +657,9 @@ async fn handle_tenant_timeline_passthrough(
|
||||
let client = mgmt_api::Client::new(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
service.get_config().ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
|
||||
@@ -7,7 +7,7 @@ use pageserver_api::controller_api::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
@@ -276,10 +276,12 @@ impl Node {
|
||||
/// This will return None to indicate cancellation. Cancellation may happen from
|
||||
/// the cancellation token passed in, or from Self's cancellation token (i.e. node
|
||||
/// going offline).
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
&self,
|
||||
mut op: O,
|
||||
jwt: &Option<String>,
|
||||
ssl_ca_cert: &Option<Certificate>,
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
@@ -298,19 +300,26 @@ impl Node {
|
||||
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
|
||||
ApiError(_, _) => true,
|
||||
Cancelled => true,
|
||||
CreateClient(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: refactor PageserverClient and with_client_retires (#11113).
|
||||
let mut http_client = reqwest::ClientBuilder::new().timeout(timeout);
|
||||
if let Some(ssl_ca_cert) = ssl_ca_cert.as_ref() {
|
||||
http_client = http_client.add_root_certificate(ssl_ca_cert.clone())
|
||||
}
|
||||
|
||||
let http_client = match http_client.build() {
|
||||
Ok(http_client) => http_client,
|
||||
Err(err) => return Some(Err(mgmt_api::Error::CreateClient(err))),
|
||||
};
|
||||
|
||||
backoff::retry(
|
||||
|| {
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.timeout(timeout)
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
|
||||
let client = PageserverClient::from_client(
|
||||
self.get_id(),
|
||||
http_client,
|
||||
http_client.clone(),
|
||||
self.base_url(),
|
||||
jwt.as_deref(),
|
||||
);
|
||||
|
||||
@@ -8,7 +8,7 @@ use pageserver_api::models::{
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::BlockUnblock;
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
|
||||
@@ -46,11 +46,16 @@ macro_rules! measured_request {
|
||||
}
|
||||
|
||||
impl PageserverClient {
|
||||
pub(crate) fn new(node_id: NodeId, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<&str>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Client::new(mgmt_api_endpoint, jwt, ssl_ca_cert)?,
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn from_client(
|
||||
|
||||
@@ -299,6 +299,7 @@ impl Reconciler {
|
||||
.await
|
||||
},
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
timeout,
|
||||
@@ -420,7 +421,8 @@ impl Reconciler {
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.service_config.ssl_ca_cert.clone(),
|
||||
)?;
|
||||
|
||||
client
|
||||
.wait_lsn(
|
||||
@@ -443,7 +445,8 @@ impl Reconciler {
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.service_config.ssl_ca_cert.clone(),
|
||||
)?;
|
||||
|
||||
let timelines = client.timeline_list(&tenant_shard_id).await?;
|
||||
Ok(timelines
|
||||
@@ -481,6 +484,7 @@ impl Reconciler {
|
||||
.await
|
||||
},
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
request_download_timeout * 2,
|
||||
@@ -775,6 +779,7 @@ impl Reconciler {
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
@@ -1123,6 +1128,7 @@ impl Reconciler {
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
&self.service_config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
|
||||
@@ -262,6 +262,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
|
||||
}
|
||||
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
|
||||
mgmt_api::Error::CreateClient(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -887,6 +888,7 @@ impl Service {
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
5,
|
||||
timeout,
|
||||
@@ -984,11 +986,20 @@ impl Service {
|
||||
break;
|
||||
}
|
||||
|
||||
let client = PageserverClient::new(
|
||||
let client = match PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
) {
|
||||
Ok(client) => client,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to create client to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match client
|
||||
.location_config(
|
||||
tenant_shard_id,
|
||||
@@ -1015,7 +1026,7 @@ impl Service {
|
||||
// Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
|
||||
// break anything.
|
||||
tracing::error!(
|
||||
"Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
|
||||
"Failed to detach unknown shard {tenant_shard_id} on pageserver {node_id}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1924,6 +1935,7 @@ impl Service {
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -1982,6 +1994,7 @@ impl Service {
|
||||
.await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -3125,7 +3138,9 @@ impl Service {
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
|
||||
@@ -3186,7 +3201,9 @@ impl Service {
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
futs.push(async move {
|
||||
let result = client
|
||||
.tenant_secondary_download(tenant_shard_id, wait)
|
||||
@@ -3309,6 +3326,7 @@ impl Service {
|
||||
.await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
@@ -3464,6 +3482,7 @@ impl Service {
|
||||
tenant_shard_id: TenantShardId,
|
||||
locations: ShardMutationLocations,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
let latest = locations.latest.node;
|
||||
@@ -3476,7 +3495,8 @@ impl Service {
|
||||
);
|
||||
|
||||
let client =
|
||||
PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref());
|
||||
PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref(), ssl_ca_cert.clone())
|
||||
.map_err(|e| passthrough_api_error(&latest, e))?;
|
||||
|
||||
let timeline_info = client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
@@ -3499,7 +3519,9 @@ impl Service {
|
||||
location.node.get_id(),
|
||||
location.node.base_url(),
|
||||
jwt.as_deref(),
|
||||
);
|
||||
ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&location.node, e))?;
|
||||
|
||||
let res = client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
@@ -3528,6 +3550,7 @@ impl Service {
|
||||
shard_zero_tid,
|
||||
shard_zero_locations,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -3557,6 +3580,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
mutation_locations,
|
||||
jwt.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
create_req,
|
||||
))
|
||||
},
|
||||
@@ -3598,13 +3622,15 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
req: TimelineArchivalConfigRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
tracing::info!(
|
||||
"Setting archival config of timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
client
|
||||
.timeline_archival_config(tenant_shard_id, timeline_id, &req)
|
||||
@@ -3627,6 +3653,7 @@ impl Service {
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
req.clone(),
|
||||
))
|
||||
})
|
||||
@@ -3663,12 +3690,14 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
|
||||
tracing::info!(
|
||||
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
client
|
||||
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
|
||||
@@ -3708,6 +3737,7 @@ impl Service {
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -3760,9 +3790,16 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
dir: BlockUnblock,
|
||||
) -> Result<(), ApiError> {
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
jwt.as_deref(),
|
||||
ssl_ca_cert,
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
client
|
||||
.timeline_block_unblock_gc(tenant_shard_id, timeline_id, dir)
|
||||
@@ -3782,6 +3819,7 @@ impl Service {
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
@@ -3903,6 +3941,7 @@ impl Service {
|
||||
node.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
@@ -4126,12 +4165,14 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
ssl_ca_cert: Option<Certificate>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
tracing::info!(
|
||||
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref(), ssl_ca_cert)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
let res = client
|
||||
.timeline_delete(tenant_shard_id, timeline_id)
|
||||
.await;
|
||||
@@ -4158,6 +4199,7 @@ impl Service {
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -4180,6 +4222,7 @@ impl Service {
|
||||
timeline_id,
|
||||
shard_zero_locations.latest.node,
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
@@ -4611,6 +4654,7 @@ impl Service {
|
||||
client.location_config(child_id, config, None, false).await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
10,
|
||||
Duration::from_secs(5),
|
||||
@@ -5214,7 +5258,9 @@ impl Service {
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(node, e))?;
|
||||
let response = client
|
||||
.tenant_shard_split(
|
||||
*parent_id,
|
||||
@@ -5698,7 +5744,9 @@ impl Service {
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
self.config.ssl_ca_cert.clone(),
|
||||
)
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
let scan_result = client
|
||||
.tenant_scan_remote_storage(tenant_id)
|
||||
@@ -7340,6 +7388,7 @@ impl Service {
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7376,6 +7425,7 @@ impl Service {
|
||||
.await
|
||||
},
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7503,6 +7553,7 @@ impl Service {
|
||||
node.with_client_retries(
|
||||
|client| async move { client.top_tenant_shards(request.clone()).await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -7622,6 +7673,7 @@ impl Service {
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
|
||||
&self.config.pageserver_jwt_token,
|
||||
&self.config.ssl_ca_cert,
|
||||
1,
|
||||
3,
|
||||
Duration::from_millis(250),
|
||||
|
||||
@@ -463,6 +463,10 @@ class NeonEnvBuilder:
|
||||
self.control_plane_compute_hook_api: str | None = None
|
||||
self.storage_controller_config: dict[Any, Any] | None = None
|
||||
|
||||
# Flag to enable https listener in pageserver, generate local ssl certs,
|
||||
# and force storage controller to use https for pageserver api.
|
||||
self.use_https_pageserver_api: bool = False
|
||||
|
||||
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
|
||||
self.pageserver_get_vectored_concurrent_io: str | None = (
|
||||
pageserver_get_vectored_concurrent_io
|
||||
@@ -1059,6 +1063,11 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.generate_local_ssl_certs = config.use_https_pageserver_api
|
||||
self.ssl_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
|
||||
)
|
||||
|
||||
neon_local_env_vars = {}
|
||||
if self.rust_log_override is not None:
|
||||
neon_local_env_vars["RUST_LOG"] = self.rust_log_override
|
||||
@@ -1122,6 +1131,7 @@ class NeonEnv:
|
||||
},
|
||||
"safekeepers": [],
|
||||
"pageservers": [],
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
}
|
||||
|
||||
if self.control_plane_api is not None:
|
||||
@@ -1130,8 +1140,14 @@ class NeonEnv:
|
||||
if self.control_plane_compute_hook_api is not None:
|
||||
cfg["control_plane_compute_hook_api"] = self.control_plane_compute_hook_api
|
||||
|
||||
if self.storage_controller_config is not None:
|
||||
cfg["storage_controller"] = self.storage_controller_config
|
||||
storage_controller_config = self.storage_controller_config
|
||||
|
||||
if config.use_https_pageserver_api:
|
||||
storage_controller_config = storage_controller_config or {}
|
||||
storage_controller_config["use_https_pageserver_api"] = True
|
||||
|
||||
if storage_controller_config is not None:
|
||||
cfg["storage_controller"] = storage_controller_config
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
@@ -1142,6 +1158,7 @@ class NeonEnv:
|
||||
pageserver_port = PageserverPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
https=self.port_distributor.get_port() if config.use_https_pageserver_api else None,
|
||||
)
|
||||
|
||||
# Availabilty zones may also be configured manually with `NeonEnvBuilder.pageserver_config_override`
|
||||
@@ -1156,6 +1173,9 @@ class NeonEnv:
|
||||
"id": ps_id,
|
||||
"listen_pg_addr": f"localhost:{pageserver_port.pg}",
|
||||
"listen_http_addr": f"localhost:{pageserver_port.http}",
|
||||
"listen_https_addr": f"localhost:{pageserver_port.https}"
|
||||
if config.use_https_pageserver_api
|
||||
else None,
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
"availability_zone": availability_zone,
|
||||
|
||||
15
test_runner/regress/test_ssl.py
Normal file
15
test_runner/regress/test_ssl.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test HTTPS pageserver management API.
|
||||
If NeonEnv starts with use_https_pageserver_api with no errors, it's already a success.
|
||||
Make /v1/status request to HTTPS API to ensure it's appropriately configured.
|
||||
"""
|
||||
neon_env_builder.use_https_pageserver_api = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
Reference in New Issue
Block a user