mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
safekeeper: add enable_tls_wal_service_api (#11520)
## Problem Safekeeper doesn't use TLS in wal service - Closes: https://github.com/neondatabase/cloud/issues/27302 ## Summary of changes - Add `enable_tls_wal_service_api` option to safekeeper's cmd arguments - Propagate `tls_server_config` to `wal_service` if the option is enabled - Create `BACKGROUND_RUNTIME` for small background tasks and offload SSL certificate reloader to it. No integration tests for now because support from compute side is required: https://github.com/neondatabase/cloud/issues/25823
This commit is contained in:
@@ -14,6 +14,7 @@ use clap::{ArgAction, Parser};
|
|||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
|
use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||||
use metrics::set_build_info_metric;
|
use metrics::set_build_info_metric;
|
||||||
use remote_storage::RemoteStorageConfig;
|
use remote_storage::RemoteStorageConfig;
|
||||||
use safekeeper::defaults::{
|
use safekeeper::defaults::{
|
||||||
@@ -23,8 +24,8 @@ use safekeeper::defaults::{
|
|||||||
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
|
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
|
||||||
};
|
};
|
||||||
use safekeeper::{
|
use safekeeper::{
|
||||||
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
|
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
|
||||||
control_file, http, wal_backup, wal_service,
|
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
|
||||||
};
|
};
|
||||||
use sd_notify::NotifyState;
|
use sd_notify::NotifyState;
|
||||||
use storage_broker::{DEFAULT_ENDPOINT, Uri};
|
use storage_broker::{DEFAULT_ENDPOINT, Uri};
|
||||||
@@ -215,16 +216,21 @@ struct Args {
|
|||||||
ssl_cert_file: Utf8PathBuf,
|
ssl_cert_file: Utf8PathBuf,
|
||||||
/// Period to reload certificate and private key from files.
|
/// Period to reload certificate and private key from files.
|
||||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
|
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
|
||||||
pub ssl_cert_reload_period: Duration,
|
ssl_cert_reload_period: Duration,
|
||||||
/// Trusted root CA certificates to use in https APIs.
|
/// Trusted root CA certificates to use in https APIs.
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
pub ssl_ca_file: Option<Utf8PathBuf>,
|
ssl_ca_file: Option<Utf8PathBuf>,
|
||||||
/// Flag to use https for requests to peer's safekeeper API.
|
/// Flag to use https for requests to peer's safekeeper API.
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
pub use_https_safekeeper_api: bool,
|
use_https_safekeeper_api: bool,
|
||||||
/// Path to the JWT auth token used to authenticate with other safekeepers.
|
/// Path to the JWT auth token used to authenticate with other safekeepers.
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
auth_token_path: Option<Utf8PathBuf>,
|
auth_token_path: Option<Utf8PathBuf>,
|
||||||
|
/// Enable TLS in WAL service API.
|
||||||
|
/// Does not force TLS: the client negotiates TLS usage during the handshake.
|
||||||
|
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
|
||||||
|
#[arg(long)]
|
||||||
|
enable_tls_wal_service_api: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Like PathBufValueParser, but allows empty string.
|
// Like PathBufValueParser, but allows empty string.
|
||||||
@@ -418,6 +424,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
ssl_cert_reload_period: args.ssl_cert_reload_period,
|
||||||
ssl_ca_certs,
|
ssl_ca_certs,
|
||||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||||
|
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
|
||||||
});
|
});
|
||||||
|
|
||||||
// initialize sentry if SENTRY_DSN is provided
|
// initialize sentry if SENTRY_DSN is provided
|
||||||
@@ -517,6 +524,36 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
|||||||
info!("running in current thread runtime");
|
info!("running in current thread runtime");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api {
|
||||||
|
let ssl_key_file = conf.ssl_key_file.clone();
|
||||||
|
let ssl_cert_file = conf.ssl_cert_file.clone();
|
||||||
|
let ssl_cert_reload_period = conf.ssl_cert_reload_period;
|
||||||
|
|
||||||
|
// Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
|
||||||
|
// task is run in this runtime.
|
||||||
|
let cert_resolver = current_thread_rt
|
||||||
|
.as_ref()
|
||||||
|
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
|
||||||
|
.spawn(async move {
|
||||||
|
ReloadingCertificateResolver::new(
|
||||||
|
"main",
|
||||||
|
&ssl_key_file,
|
||||||
|
&ssl_cert_file,
|
||||||
|
ssl_cert_reload_period,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
})
|
||||||
|
.await??;
|
||||||
|
|
||||||
|
let config = rustls::ServerConfig::builder()
|
||||||
|
.with_no_client_auth()
|
||||||
|
.with_cert_resolver(cert_resolver);
|
||||||
|
|
||||||
|
Some(Arc::new(config))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let wal_service_handle = current_thread_rt
|
let wal_service_handle = current_thread_rt
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||||
@@ -524,6 +561,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
|||||||
conf.clone(),
|
conf.clone(),
|
||||||
pg_listener,
|
pg_listener,
|
||||||
Scope::SafekeeperData,
|
Scope::SafekeeperData,
|
||||||
|
conf.enable_tls_wal_service_api
|
||||||
|
.then(|| tls_server_config.clone())
|
||||||
|
.flatten(),
|
||||||
global_timelines.clone(),
|
global_timelines.clone(),
|
||||||
))
|
))
|
||||||
// wrap with task name for error reporting
|
// wrap with task name for error reporting
|
||||||
@@ -552,6 +592,9 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
|||||||
conf.clone(),
|
conf.clone(),
|
||||||
pg_listener_tenant_only,
|
pg_listener_tenant_only,
|
||||||
Scope::Tenant,
|
Scope::Tenant,
|
||||||
|
conf.enable_tls_wal_service_api
|
||||||
|
.then(|| tls_server_config.clone())
|
||||||
|
.flatten(),
|
||||||
global_timelines.clone(),
|
global_timelines.clone(),
|
||||||
))
|
))
|
||||||
// wrap with task name for error reporting
|
// wrap with task name for error reporting
|
||||||
@@ -577,6 +620,7 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
|||||||
.spawn(http::task_main_https(
|
.spawn(http::task_main_https(
|
||||||
conf.clone(),
|
conf.clone(),
|
||||||
https_listener,
|
https_listener,
|
||||||
|
tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
|
||||||
global_timelines.clone(),
|
global_timelines.clone(),
|
||||||
))
|
))
|
||||||
.map(|res| ("HTTPS service main".to_owned(), res));
|
.map(|res| ("HTTPS service main".to_owned(), res));
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
pub mod routes;
|
pub mod routes;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use http_utils::tls_certs::ReloadingCertificateResolver;
|
|
||||||
pub use routes::make_router;
|
pub use routes::make_router;
|
||||||
pub use safekeeper_api::models;
|
pub use safekeeper_api::models;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@@ -28,21 +27,10 @@ pub async fn task_main_http(
|
|||||||
pub async fn task_main_https(
|
pub async fn task_main_https(
|
||||||
conf: Arc<SafeKeeperConf>,
|
conf: Arc<SafeKeeperConf>,
|
||||||
https_listener: std::net::TcpListener,
|
https_listener: std::net::TcpListener,
|
||||||
|
tls_config: Arc<rustls::ServerConfig>,
|
||||||
global_timelines: Arc<GlobalTimelines>,
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let cert_resolver = ReloadingCertificateResolver::new(
|
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
|
||||||
"main",
|
|
||||||
&conf.ssl_key_file,
|
|
||||||
&conf.ssl_cert_file,
|
|
||||||
conf.ssl_cert_reload_period,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let server_config = rustls::ServerConfig::builder()
|
|
||||||
.with_no_client_auth()
|
|
||||||
.with_cert_resolver(cert_resolver);
|
|
||||||
|
|
||||||
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
|
|
||||||
|
|
||||||
let router = make_router(conf, global_timelines)
|
let router = make_router(conf, global_timelines)
|
||||||
.build()
|
.build()
|
||||||
|
|||||||
@@ -122,6 +122,7 @@ pub struct SafeKeeperConf {
|
|||||||
pub ssl_cert_reload_period: Duration,
|
pub ssl_cert_reload_period: Duration,
|
||||||
pub ssl_ca_certs: Vec<Pem>,
|
pub ssl_ca_certs: Vec<Pem>,
|
||||||
pub use_https_safekeeper_api: bool,
|
pub use_https_safekeeper_api: bool,
|
||||||
|
pub enable_tls_wal_service_api: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SafeKeeperConf {
|
impl SafeKeeperConf {
|
||||||
@@ -172,6 +173,7 @@ impl SafeKeeperConf {
|
|||||||
ssl_cert_reload_period: Duration::from_secs(60),
|
ssl_cert_reload_period: Duration::from_secs(60),
|
||||||
ssl_ca_certs: Vec::new(),
|
ssl_ca_certs: Vec::new(),
|
||||||
use_https_safekeeper_api: false,
|
use_https_safekeeper_api: false,
|
||||||
|
enable_tls_wal_service_api: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -209,3 +211,12 @@ pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|||||||
.build()
|
.build()
|
||||||
.expect("Failed to create WAL backup runtime")
|
.expect("Failed to create WAL backup runtime")
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||||
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.thread_name("background worker")
|
||||||
|
.worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.expect("Failed to create background runtime")
|
||||||
|
});
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ pub async fn task_main(
|
|||||||
conf: Arc<SafeKeeperConf>,
|
conf: Arc<SafeKeeperConf>,
|
||||||
pg_listener: std::net::TcpListener,
|
pg_listener: std::net::TcpListener,
|
||||||
allowed_auth_scope: Scope,
|
allowed_auth_scope: Scope,
|
||||||
|
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||||
global_timelines: Arc<GlobalTimelines>,
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// Tokio's from_std won't do this for us, per its comment.
|
// Tokio's from_std won't do this for us, per its comment.
|
||||||
@@ -43,9 +44,10 @@ pub async fn task_main(
|
|||||||
let conf = conf.clone();
|
let conf = conf.clone();
|
||||||
let conn_id = issue_connection_id(&mut connection_count);
|
let conn_id = issue_connection_id(&mut connection_count);
|
||||||
let global_timelines = global_timelines.clone();
|
let global_timelines = global_timelines.clone();
|
||||||
|
let tls_config = tls_config.clone();
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
|
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, tls_config, global_timelines).await {
|
||||||
error!("connection handler exited: {}", err);
|
error!("connection handler exited: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -61,6 +63,7 @@ async fn handle_socket(
|
|||||||
conf: Arc<SafeKeeperConf>,
|
conf: Arc<SafeKeeperConf>,
|
||||||
conn_id: ConnectionId,
|
conn_id: ConnectionId,
|
||||||
allowed_auth_scope: Scope,
|
allowed_auth_scope: Scope,
|
||||||
|
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||||
global_timelines: Arc<GlobalTimelines>,
|
global_timelines: Arc<GlobalTimelines>,
|
||||||
) -> Result<(), QueryError> {
|
) -> Result<(), QueryError> {
|
||||||
socket.set_nodelay(true)?;
|
socket.set_nodelay(true)?;
|
||||||
@@ -110,7 +113,8 @@ async fn handle_socket(
|
|||||||
auth_pair,
|
auth_pair,
|
||||||
global_timelines,
|
global_timelines,
|
||||||
);
|
);
|
||||||
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
|
let pgbackend =
|
||||||
|
PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
|
||||||
// libpq protocol between safekeeper and walproposer / pageserver
|
// libpq protocol between safekeeper and walproposer / pageserver
|
||||||
// We don't use shutdown.
|
// We don't use shutdown.
|
||||||
pgbackend
|
pgbackend
|
||||||
|
|||||||
@@ -185,6 +185,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
|||||||
ssl_cert_reload_period: Duration::ZERO,
|
ssl_cert_reload_period: Duration::ZERO,
|
||||||
ssl_ca_certs: Vec::new(),
|
ssl_ca_certs: Vec::new(),
|
||||||
use_https_safekeeper_api: false,
|
use_https_safekeeper_api: false,
|
||||||
|
enable_tls_wal_service_api: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||||
|
|||||||
Reference in New Issue
Block a user