diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index 0fb45e01c6..716984b64e 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -16,7 +16,7 @@ use crate::id::TenantId; /// Algorithm to use. We require EdDSA. const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA; -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] #[serde(rename_all = "lowercase")] pub enum Scope { // Provides access to all data for a specific tenant (specified in `struct Claims` below) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 0625538bf3..abede2e44d 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -37,7 +37,7 @@ use safekeeper::{http, WAL_REMOVER_RUNTIME}; use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME}; use safekeeper::{wal_backup, HTTP_RUNTIME}; use storage_broker::DEFAULT_ENDPOINT; -use utils::auth::JwtAuth; +use utils::auth::{JwtAuth, Scope}; use utils::{ id::NodeId, logging::{self, LogFormat}, @@ -72,6 +72,10 @@ struct Args { /// Listen endpoint for receiving/sending WAL in the form host:port. #[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)] listen_pg: String, + /// Listen endpoint for receiving/sending WAL in the form host:port allowing + /// only tenant scoped auth tokens. Pointless if auth is disabled. + #[arg(long, default_value = None, verbatim_doc_comment)] + listen_pg_tenant_only: Option, /// Listen http endpoint for management and metrics in the form host:port. #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)] listen_http: String, @@ -94,7 +98,7 @@ struct Args { broker_keepalive_interval: Duration, /// Peer safekeeper is considered dead after not receiving heartbeats from /// it during this period passed as a human readable duration. - #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)] + #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)] heartbeat_timeout: Duration, /// Remote storage configuration for WAL backup (offloading to s3) as TOML /// inline table, e.g. @@ -179,6 +183,7 @@ async fn main() -> anyhow::Result<()> { workdir, my_id: id, listen_pg_addr: args.listen_pg, + listen_pg_addr_tenant_only: args.listen_pg_tenant_only, listen_http_addr: args.listen_http, availability_zone: args.availability_zone, no_sync: args.no_sync, @@ -222,6 +227,21 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { e })?; + let pg_listener_tenant_only = + if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only { + info!( + "starting safekeeper tenant scoped WAL service on {}", + listen_pg_addr_tenant_only + ); + let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); + e + })?; + Some(listener) + } else { + None + }; + info!( "starting safekeeper HTTP service on {}", conf.listen_http_addr @@ -253,14 +273,34 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let current_thread_rt = conf .current_thread_runtime .then(|| Handle::try_current().expect("no runtime in main")); + let wal_service_handle = current_thread_rt .as_ref() .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) - .spawn(wal_service::task_main(conf_, pg_listener)) + .spawn(wal_service::task_main( + conf_, + pg_listener, + Some(Scope::SafekeeperData), + )) // wrap with task name for error reporting .map(|res| ("WAL service main".to_owned(), res)); tasks_handles.push(Box::pin(wal_service_handle)); + if let Some(pg_listener_tenant_only) = pg_listener_tenant_only { + let conf_ = conf.clone(); + let wal_service_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) + .spawn(wal_service::task_main( + conf_, + pg_listener_tenant_only, + Some(Scope::Tenant), + )) + // wrap with task name for error reporting + .map(|res| ("WAL service tenant only main".to_owned(), res)); + tasks_handles.push(Box::pin(wal_service_handle)); + } + let conf_ = conf.clone(); let http_handle = current_thread_rt .as_ref() diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 1367d5eebb..5fe9db9628 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -34,6 +34,8 @@ pub struct SafekeeperPostgresHandler { pub ttid: TenantTimelineId, /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, + /// Auth scope allowed on the connections. None if auth is not configured. + allowed_auth_scope: Option, claims: Option, io_metrics: Option, } @@ -147,6 +149,16 @@ impl postgres_backend::Handler .unwrap() .decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?; + let scope = self + .allowed_auth_scope + .expect("auth is enabled but scope is not configured"); + // The handler might be configured to allow only tenant scope tokens. + if matches!(scope, Scope::Tenant) && !matches!(data.claims.scope, Scope::Tenant) { + return Err(QueryError::Other(anyhow::anyhow!( + "passed JWT token is for full access, but only tenant scope is allowed" + ))); + } + if matches!(data.claims.scope, Scope::Tenant) && data.claims.tenant_id.is_none() { return Err(QueryError::Other(anyhow::anyhow!( "jwt token scope is Tenant, but tenant id is missing" @@ -215,7 +227,12 @@ impl postgres_backend::Handler } impl SafekeeperPostgresHandler { - pub fn new(conf: SafeKeeperConf, conn_id: u32, io_metrics: Option) -> Self { + pub fn new( + conf: SafeKeeperConf, + conn_id: u32, + io_metrics: Option, + allowed_auth_scope: Option, + ) -> Self { SafekeeperPostgresHandler { conf, appname: None, @@ -224,6 +241,7 @@ impl SafekeeperPostgresHandler { ttid: TenantTimelineId::empty(), conn_id, claims: None, + allowed_auth_scope, io_metrics, } } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index b8e1101369..1a1c0add67 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -53,6 +53,7 @@ pub struct SafeKeeperConf { pub workdir: PathBuf, pub my_id: NodeId, pub listen_pg_addr: String, + pub listen_pg_addr_tenant_only: Option, pub listen_http_addr: String, pub availability_zone: Option, pub no_sync: bool, @@ -85,6 +86,7 @@ impl SafeKeeperConf { workdir: PathBuf::from("./"), no_sync: false, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), + listen_pg_addr_tenant_only: None, listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), availability_zone: None, remote_storage: None, diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 406132b2b0..43e870e621 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -8,7 +8,7 @@ use std::{future, time::Duration}; use tokio::net::TcpStream; use tokio_io_timeout::TimeoutReader; use tracing::*; -use utils::measured_stream::MeasuredStream; +use utils::{auth::Scope, measured_stream::MeasuredStream}; use crate::handler::SafekeeperPostgresHandler; use crate::metrics::TrafficMetrics; @@ -19,6 +19,7 @@ use postgres_backend::{AuthType, PostgresBackend}; pub async fn task_main( conf: SafeKeeperConf, pg_listener: std::net::TcpListener, + allowed_auth_scope: Option, ) -> anyhow::Result<()> { // Tokio's from_std won't do this for us, per its comment. pg_listener.set_nonblocking(true)?; @@ -33,7 +34,7 @@ pub async fn task_main( let conn_id = issue_connection_id(&mut connection_count); tokio::spawn(async move { - if let Err(err) = handle_socket(socket, conf, conn_id) + if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope) .instrument(info_span!("", cid = %conn_id)) .await { @@ -49,6 +50,7 @@ async fn handle_socket( socket: TcpStream, conf: SafeKeeperConf, conn_id: ConnectionId, + allowed_auth_scope: Option, ) -> Result<(), QueryError> { socket.set_nodelay(true)?; let peer_addr = socket.peer_addr()?; @@ -84,8 +86,12 @@ async fn handle_socket( None => AuthType::Trust, Some(_) => AuthType::NeonJWT, }; - let mut conn_handler = - SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); + let mut conn_handler = SafekeeperPostgresHandler::new( + conf, + conn_id, + Some(traffic_metrics.clone()), + allowed_auth_scope, + ); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; // libpq protocol between safekeeper and walproposer / pageserver // We don't use shutdown.