storage_broker: https handler (#11603)

## Problem
Broker supports only HTTP, no HTTPS
- Closes: https://github.com/neondatabase/cloud/issues/27492

## Summary of changes
- Add `listen_https_addr`, `ssl_key_file`, `ssl_cert_file`,
`ssl_cert_reload_period` arguments to storage broker
- Make `listen_addr` argument optional
- Listen https in storage broker
- Support https for storage broker request in neon_local
- Add `use_https_storage_broker_api` option to NeonEnvBuilder
This commit is contained in:
Dmitrii Kovalkov
2025-04-25 18:28:56 +04:00
committed by GitHub
parent 6f0046b688
commit ef53a76434
12 changed files with 278 additions and 119 deletions

3
Cargo.lock generated
View File

@@ -6616,12 +6616,14 @@ dependencies = [
"anyhow",
"async-stream",
"bytes",
"camino",
"clap",
"const_format",
"futures",
"futures-core",
"futures-util",
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper-util",
@@ -6631,6 +6633,7 @@ dependencies = [
"prost 0.13.3",
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.0",
"tonic",
"tonic-build",
"tracing",

View File

@@ -17,8 +17,10 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
@@ -28,7 +30,6 @@ use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{FlockArg, flock};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
@@ -988,7 +989,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
NeonLocalInitConf {
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
listen_https_addr: None,
},
safekeepers: vec![SafekeeperConf {
id: DEFAULT_SAFEKEEPER_ID,
@@ -1777,7 +1779,8 @@ async fn handle_endpoint_storage(
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.start(&args.start_timeout).await {
eprintln!("broker start failed: {e}");
exit(1);
}
@@ -1785,7 +1788,8 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local
StorageBrokerCmd::Stop(_args) => {
// FIXME: stop_mode unused
if let Err(e) = broker::stop_broker_process(env) {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
eprintln!("broker stop failed: {e}");
exit(1);
}
@@ -1835,8 +1839,11 @@ async fn handle_start_all_impl(
#[allow(clippy::redundant_closure_call)]
(|| {
js.spawn(async move {
let retry_timeout = retry_timeout;
broker::start_broker_process(env, &retry_timeout).await
let storage_broker = StorageBroker::from_env(env);
storage_broker
.start(&retry_timeout)
.await
.map_err(|e| e.context("start storage_broker"))
});
js.spawn(async move {
@@ -1991,7 +1998,8 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
if let Err(e) = broker::stop_broker_process(env) {
let storage_broker = StorageBroker::from_env(env);
if let Err(e) = storage_broker.stop() {
eprintln!("neon broker stop failed: {e:#}");
}

View File

@@ -3,44 +3,67 @@
//! In the local test environment, the storage broker stores its data directly in
//!
//! ```text
//! .neon
//! .neon/storage_broker
//! ```
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env};
use crate::{background_process, local_env::LocalEnv};
pub async fn start_broker_process(
env: &local_env::LocalEnv,
retry_timeout: &Duration,
) -> anyhow::Result<()> {
let broker = &env.broker;
let listen_addr = &broker.listen_addr;
pub struct StorageBroker {
env: LocalEnv,
}
print!("Starting neon broker at {}", listen_addr);
impl StorageBroker {
/// Create a new `StorageBroker` instance from the environment.
pub fn from_env(env: &LocalEnv) -> Self {
Self { env: env.clone() }
}
let args = [format!("--listen-addr={listen_addr}")];
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
self.env.generate_ssl_cert(
&self.env.storage_broker_data_dir().join("server.crt"),
&self.env.storage_broker_data_dir().join("server.key"),
)?;
}
Ok(())
}
let client = reqwest::Client::new();
/// Start the storage broker process.
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
print!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();
if let Some(addr) = &broker.listen_addr {
args.push(format!("--listen-addr={addr}"));
}
if let Some(addr) = &broker.listen_https_addr {
args.push(format!("--listen-https-addr={addr}"));
}
let client = self.env.create_http_client();
background_process::start_process(
"storage_broker",
&env.base_data_dir,
&env.storage_broker_bin(),
&self.env.storage_broker_data_dir(),
&self.env.storage_broker_bin(),
args,
[],
background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
background_process::InitialPidFile::Create(self.pid_file_path()),
retry_timeout,
|| async {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
})?;
let request = client
.get(status_url)
.build()
.with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
let request = client.get(status_url).build().with_context(|| {
format!("Failed to construct request to broker endpoint {url}")
})?;
match client.execute(request).await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
@@ -50,13 +73,16 @@ pub async fn start_broker_process(
.await
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
}
}
pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
}
/// Stop the storage broker process.
pub fn stop(&self) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &self.pid_file_path())
}
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
/// Get the path to the PID file for the storage broker.
fn pid_file_path(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
}
}

View File

@@ -4,7 +4,7 @@
//! script which will use local paths.
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
@@ -14,11 +14,12 @@ use anyhow::{Context, bail};
use clap::ValueEnum;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::Url;
use reqwest::{Certificate, Url};
use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::broker::StorageBroker;
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -157,11 +158,16 @@ pub struct EndpointStorageConf {
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)]
#[serde(default)]
pub struct NeonBroker {
/// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
pub listen_addr: SocketAddr,
/// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
pub listen_addr: Option<SocketAddr>,
/// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'.
/// At least one of listen_addr or listen_https_addr must be set.
/// listen_https_addr is preferred over listen_addr in neon_local.
pub listen_https_addr: Option<SocketAddr>,
}
/// A part of storage controller's config the neon_local knows about.
@@ -235,18 +241,19 @@ impl Default for NeonStorageControllerConf {
}
}
// Dummy Default impl to satisfy Deserialize derive.
impl Default for NeonBroker {
fn default() -> Self {
NeonBroker {
listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
}
}
}
impl NeonBroker {
pub fn client_url(&self) -> Url {
Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
let url = if let Some(addr) = self.listen_https_addr {
format!("https://{}", addr)
} else {
format!(
"http://{}",
self.listen_addr
.expect("at least one address should be set")
)
};
Url::parse(&url).expect("failed to construct url")
}
}
@@ -441,6 +448,10 @@ impl LocalEnv {
self.base_data_dir.join("endpoints")
}
pub fn storage_broker_data_dir(&self) -> PathBuf {
self.base_data_dir.join("storage_broker")
}
pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
self.base_data_dir
.join(format!("pageserver_{pageserver_id}"))
@@ -503,6 +514,23 @@ impl LocalEnv {
)
}
/// Creates HTTP client with local SSL CA certificates.
pub fn create_http_client(&self) -> reqwest::Client {
let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
http_client
.build()
.expect("HTTP client should construct with no error")
}
/// Inspect the base data directory and extract the instance id and instance directory path
/// for all storage controller instances
pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
@@ -911,6 +939,12 @@ impl LocalEnv {
// create endpoints dir
fs::create_dir_all(env.endpoints_path())?;
// create storage broker dir
fs::create_dir_all(env.storage_broker_data_dir())?;
StorageBroker::from_env(&env)
.initialize()
.context("storage broker init failed")?;
// create safekeeper dirs
for safekeeper in &env.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?;

View File

@@ -21,7 +21,6 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use reqwest::Certificate;
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -51,19 +50,6 @@ impl PageServerNode {
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("Client constructs with no errors");
let endpoint = if env.storage_controller.use_https_pageserver_api {
format!(
"https://{}",
@@ -80,7 +66,7 @@ impl PageServerNode {
conf: conf.clone(),
env: env.clone(),
http_client: mgmt_api::Client::new(
http_client,
env.create_http_client(),
endpoint,
{
match conf.http_auth_type {

View File

@@ -87,7 +87,7 @@ impl SafekeeperNode {
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
env: env.clone(),
http_client: reqwest::Client::new(),
http_client: env.create_http_client(),
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
listen_addr,
}

View File

@@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Certificate, Method};
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -153,24 +153,11 @@ impl StorageController {
}
};
let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| {
let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist");
Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid")
});
let mut http_client = reqwest::Client::builder();
for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.expect("HTTP client should construct with no error");
Self {
env: env.clone(),
private_key,
public_key,
client: http_client,
client: env.create_http_client(),
config: env.storage_controller.clone(),
listen_port: OnceLock::default(),
}

View File

@@ -11,6 +11,7 @@ bench = []
anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
futures.workspace = true
@@ -19,12 +20,14 @@ futures-util.workspace = true
humantime.workspace = true
hyper = { workspace = true, features = ["full"] }
http-body-util.workspace = true
http-utils.workspace = true
hyper-util = "0.1"
once_cell.workspace = true
parking_lot.workspace = true
prost.workspace = true
tonic.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-rustls.workspace = true
tracing.workspace = true
metrics.workspace = true
utils.workspace = true

View File

@@ -17,10 +17,13 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use camino::Utf8PathBuf;
use clap::{Parser, command};
use futures::future::OptionFuture;
use futures_core::Stream;
use futures_util::StreamExt;
use http_body_util::Full;
use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper::body::Incoming;
use hyper::header::CONTENT_TYPE;
use hyper::service::service_fn;
@@ -38,7 +41,7 @@ use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid};
use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid};
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
@@ -59,12 +62,25 @@ project_build_tag!(BUILD_TAG);
const DEFAULT_CHAN_SIZE: usize = 32;
const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
const DEFAULT_SSL_KEY_FILE: &str = "server.key";
const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s";
#[derive(Parser, Debug)]
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
#[clap(group(
clap::ArgGroup::new("listen-addresses")
.required(true)
.multiple(true)
.args(&["listen_addr", "listen_https_addr"]),
))]
struct Args {
/// Endpoint to listen on.
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
listen_addr: SocketAddr,
/// Endpoint to listen HTTP on.
#[arg(short, long)]
listen_addr: Option<SocketAddr>,
/// Endpoint to listen HTTPS on.
#[arg(long)]
listen_https_addr: Option<SocketAddr>,
/// Size of the queue to the per timeline subscriber.
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
timeline_chan_size: usize,
@@ -72,11 +88,20 @@ struct Args {
#[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
all_keys_chan_size: usize,
/// HTTP/2 keepalive interval.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
http2_keepalive_interval: Duration,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
/// Path to a file with certificate's private key for https API.
#[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API.
#[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: Duration,
}
/// Id of publisher for registering in maps
@@ -674,12 +699,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
let http_listener = match &args.listen_addr {
Some(addr) => {
info!("listening HTTP on {}", addr);
Some(TcpListener::bind(addr).await?)
}
None => None,
};
let (https_listener, tls_acceptor) = match &args.listen_https_addr {
Some(addr) => {
let listener = TcpListener::bind(addr).await?;
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&args.ssl_key_file,
&args.ssl_cert_file,
args.ssl_cert_reload_period,
)
.await?;
let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
// Tonic is HTTP/2 only and it negotiates it with ALPN.
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
info!("listening HTTPS on {}", addr);
(Some(listener), Some(acceptor))
}
None => (None, None),
};
// grpc is served along with http1 for metrics on a single port, hence we
// don't use tonic's Server.
let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
info!("listening on {}", &args.listen_addr);
loop {
let (stream, addr) = match tcp_listener.accept().await {
let (conn, is_https) = tokio::select! {
Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false),
Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true),
};
let (tcp_stream, addr) = match conn {
Ok(v) => v,
Err(e) => {
info!("couldn't accept connection: {e}");
@@ -734,13 +797,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
.await;
let tls_acceptor = tls_acceptor.clone();
tokio::task::spawn(async move {
let res = builder
.serve_connection(TokioIo::new(stream), service_fn_)
.await;
let res = if is_https {
let tls_acceptor =
tls_acceptor.expect("tls_acceptor is set together with https_listener");
let tls_stream = match tls_acceptor.accept(tcp_stream).await {
Ok(tls_stream) => tls_stream,
Err(e) => {
info!("error accepting TLS connection from {addr}: {e}");
return;
}
};
builder
.serve_connection(TokioIo::new(tls_stream), service_fn_)
.await
} else {
builder
.serve_connection(TokioIo::new(tcp_stream), service_fn_)
.await
};
if let Err(e) = res {
info!("error serving connection from {addr}: {e}");
info!(%is_https, "error serving connection from {addr}: {e}");
}
});
}

View File

@@ -196,7 +196,7 @@ struct Cli {
ssl_cert_reload_period: humantime::Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<PathBuf>,
ssl_ca_file: Option<Utf8PathBuf>,
/// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver
/// the compute notification directly (instead of via control plane).

View File

@@ -501,6 +501,9 @@ class NeonEnvBuilder:
# Flag to use https listener in storage controller, generate local ssl certs,
# and force pageservers and neon_local to use https for storage controller api.
self.use_https_storage_controller_api: bool = False
# Flag to use https listener in storage broker, generate local ssl certs,
# and force pageservers and safekeepers to use https for storage broker api.
self.use_https_storage_broker_api: bool = False
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
@@ -1086,7 +1089,7 @@ class NeonEnv:
self.safekeepers: list[Safekeeper] = []
self.pageservers: list[NeonPageserver] = []
self.num_azs = config.num_azs
self.broker = NeonBroker(self)
self.broker = NeonBroker(self, config.use_https_storage_broker_api)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
self.pg_version = config.pg_version
@@ -1106,6 +1109,7 @@ class NeonEnv:
config.use_https_pageserver_api
or config.use_https_safekeeper_api
or config.use_https_storage_controller_api
or config.use_https_storage_broker_api
)
self.ssl_ca_file = (
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
@@ -1178,15 +1182,18 @@ class NeonEnv:
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
"default_tenant_id": str(self.initial_tenant),
"broker": {
"listen_addr": self.broker.listen_addr(),
},
"broker": {},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
if config.use_https_storage_broker_api:
cfg["broker"]["listen_https_addr"] = self.broker.listen_addr()
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
@@ -4933,9 +4940,10 @@ class Safekeeper(LogUtils):
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""
def __init__(self, env: NeonEnv):
super().__init__(logfile=env.repo_dir / "storage_broker.log")
def __init__(self, env: NeonEnv, use_https: bool):
super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log")
self.env = env
self.scheme = "https" if use_https else "http"
self.port: int = self.env.port_distributor.get_port()
self.running = False
@@ -4958,7 +4966,7 @@ class NeonBroker(LogUtils):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://{self.listen_addr()}"
return f"{self.scheme}://{self.listen_addr()}"
def assert_no_errors(self):
assert_no_errors(self.logfile, "storage_controller", [])

View File

@@ -6,6 +6,7 @@ import pytest
import requests
from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException
from fixtures.utils import wait_until
from fixtures.workload import Workload
def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
@@ -212,3 +213,24 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
assert reload_error_cnt > 0
wait_until(reload_failed)
def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder):
"""
Test HTTPS storage broker API.
1. Make /status request to HTTPS API to ensure it's appropriately configured.
2. Generate simple workload to ensure that SK -> broker -> PS communication works well.
"""
neon_env_builder.use_https_storage_broker_api = True
env = neon_env_builder.init_start()
# 1. Simple check that HTTPS is enabled and works.
url = env.broker.client_url() + "/status"
assert url.startswith("https://")
requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status()
# 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS.
workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()
workload.write_rows(10)
workload.validate()