mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Allow to enable http/pg/pg tenant only auth separately in safekeeper.
The same option enables auth and specifies public key, so this allows to use different public keys as well. The motivation is to 1) Allow to e.g. change pageserver key/token without replacing all compute tokens. 2) Enable auth gradually.
This commit is contained in:
@@ -161,14 +161,23 @@ impl SafekeeperNode {
|
||||
|
||||
let key_path = self.env.base_data_dir.join("auth_public_key.pem");
|
||||
if self.conf.auth_enabled {
|
||||
let key_path_string = key_path
|
||||
.to_str()
|
||||
.with_context(|| {
|
||||
format!("Key path {key_path:?} cannot be represented as a unicode string")
|
||||
})?
|
||||
.to_owned();
|
||||
args.extend([
|
||||
"--auth-validation-public-key-path".to_owned(),
|
||||
key_path
|
||||
.to_str()
|
||||
.with_context(|| {
|
||||
format!("Key path {key_path:?} cannot be represented as a unicode string")
|
||||
})?
|
||||
.to_owned(),
|
||||
"--pg-auth-public-key-path".to_owned(),
|
||||
key_path_string.clone(),
|
||||
]);
|
||||
args.extend([
|
||||
"--pg-tenant-only-auth-public-key-path".to_owned(),
|
||||
key_path_string.clone(),
|
||||
]);
|
||||
args.extend([
|
||||
"--http-auth-public-key-path".to_owned(),
|
||||
key_path_string.clone(),
|
||||
]);
|
||||
}
|
||||
|
||||
|
||||
@@ -122,9 +122,21 @@ struct Args {
|
||||
/// WAL backup horizon.
|
||||
#[arg(long)]
|
||||
disable_wal_backup: bool,
|
||||
/// Path to a .pem public key which is used to check JWT tokens.
|
||||
#[arg(long)]
|
||||
auth_validation_public_key_path: Option<PathBuf>,
|
||||
/// If given, enables auth on incoming connections to WAL service endpoint
|
||||
/// (--listen-pg). Value specifies path to a .pem public key used for
|
||||
/// validations of JWT tokens.
|
||||
#[arg(long, verbatim_doc_comment)]
|
||||
pg_auth_public_key_path: Option<PathBuf>,
|
||||
/// If given, enables auth on incoming connections to tenant only WAL
|
||||
/// service endpoint (--listen-pg-tenant-only). Value specifies path to a
|
||||
/// .pem public key used for validations of JWT tokens.
|
||||
#[arg(long, verbatim_doc_comment)]
|
||||
pg_tenant_only_auth_public_key_path: Option<PathBuf>,
|
||||
/// If given, enables auth on incoming connections to http management
|
||||
/// service endpoint (--listen-http). Value specifies path to a .pem public
|
||||
/// key used for validations of JWT tokens.
|
||||
#[arg(long, verbatim_doc_comment)]
|
||||
http_auth_public_key_path: Option<PathBuf>,
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
log_format: String,
|
||||
@@ -170,13 +182,37 @@ async fn main() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let auth = match args.auth_validation_public_key_path.as_ref() {
|
||||
let pg_auth = match args.pg_auth_public_key_path.as_ref() {
|
||||
None => {
|
||||
info!("auth is disabled");
|
||||
info!("pg auth is disabled");
|
||||
None
|
||||
}
|
||||
Some(path) => {
|
||||
info!("loading JWT auth key from {}", path.display());
|
||||
info!("loading pg auth JWT key from {}", path.display());
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
}
|
||||
};
|
||||
let pg_tenant_only_auth = match args.pg_tenant_only_auth_public_key_path.as_ref() {
|
||||
None => {
|
||||
info!("pg tenant only auth is disabled");
|
||||
None
|
||||
}
|
||||
Some(path) => {
|
||||
info!("loading pg tenant only auth JWT key from {}", path.display());
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
}
|
||||
};
|
||||
let http_auth = match args.http_auth_public_key_path.as_ref() {
|
||||
None => {
|
||||
info!("http auth is disabled");
|
||||
None
|
||||
}
|
||||
Some(path) => {
|
||||
info!("loading http auth JWT key from {}", path.display());
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
@@ -199,7 +235,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
backup_parallel_jobs: args.wal_backup_parallel_jobs,
|
||||
auth,
|
||||
pg_auth,
|
||||
pg_tenant_only_auth,
|
||||
http_auth,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
};
|
||||
|
||||
@@ -288,7 +326,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
.spawn(wal_service::task_main(
|
||||
conf_,
|
||||
pg_listener,
|
||||
Some(Scope::SafekeeperData),
|
||||
Scope::SafekeeperData,
|
||||
))
|
||||
// wrap with task name for error reporting
|
||||
.map(|res| ("WAL service main".to_owned(), res));
|
||||
@@ -302,7 +340,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
.spawn(wal_service::task_main(
|
||||
conf_,
|
||||
pg_listener_tenant_only,
|
||||
Some(Scope::Tenant),
|
||||
Scope::Tenant,
|
||||
))
|
||||
// wrap with task name for error reporting
|
||||
.map(|res| ("WAL service tenant only main".to_owned(), res));
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use anyhow::Context;
|
||||
use std::str::FromStr;
|
||||
use std::str::{self};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, info_span, Instrument};
|
||||
|
||||
@@ -20,7 +21,7 @@ use postgres_backend::{self, PostgresBackend};
|
||||
use postgres_ffi::PG_TLI;
|
||||
use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
|
||||
use regex::Regex;
|
||||
use utils::auth::{Claims, Scope};
|
||||
use utils::auth::{Claims, JwtAuth, Scope};
|
||||
use utils::{
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -36,8 +37,8 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub ttid: TenantTimelineId,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
/// Auth scope allowed on the connections. None if auth is not configured.
|
||||
allowed_auth_scope: Option<Scope>,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
auth: Option<(Scope, Arc<JwtAuth>)>,
|
||||
claims: Option<Claims>,
|
||||
io_metrics: Option<TrafficMetrics>,
|
||||
}
|
||||
@@ -154,18 +155,17 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
) -> Result<(), QueryError> {
|
||||
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
|
||||
// which requires auth to be present
|
||||
let data = self
|
||||
.conf
|
||||
let (allowed_auth_scope, auth) = self
|
||||
.auth
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
|
||||
.expect("auth_type is configured but .auth of handler is missing");
|
||||
let data =
|
||||
auth.decode(str::from_utf8(jwt_response).context("jwt response is not UTF-8")?)?;
|
||||
|
||||
let scope = self
|
||||
.allowed_auth_scope
|
||||
.expect("auth is enabled but scope is not configured");
|
||||
// The handler might be configured to allow only tenant scope tokens.
|
||||
if matches!(scope, Scope::Tenant) && !matches!(data.claims.scope, Scope::Tenant) {
|
||||
if matches!(allowed_auth_scope, Scope::Tenant)
|
||||
&& !matches!(data.claims.scope, Scope::Tenant)
|
||||
{
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"passed JWT token is for full access, but only tenant scope is allowed"
|
||||
)));
|
||||
@@ -244,7 +244,7 @@ impl SafekeeperPostgresHandler {
|
||||
conf: SafeKeeperConf,
|
||||
conn_id: u32,
|
||||
io_metrics: Option<TrafficMetrics>,
|
||||
allowed_auth_scope: Option<Scope>,
|
||||
auth: Option<(Scope, Arc<JwtAuth>)>,
|
||||
) -> Self {
|
||||
SafekeeperPostgresHandler {
|
||||
conf,
|
||||
@@ -254,7 +254,7 @@ impl SafekeeperPostgresHandler {
|
||||
ttid: TenantTimelineId::empty(),
|
||||
conn_id,
|
||||
claims: None,
|
||||
allowed_auth_scope,
|
||||
auth,
|
||||
io_metrics,
|
||||
}
|
||||
}
|
||||
@@ -262,7 +262,7 @@ impl SafekeeperPostgresHandler {
|
||||
// when accessing management api supply None as an argument
|
||||
// when using to authorize tenant pass corresponding tenant id
|
||||
fn check_permission(&self, tenant_id: Option<TenantId>) -> anyhow::Result<()> {
|
||||
if self.conf.auth.is_none() {
|
||||
if self.auth.is_none() {
|
||||
// auth is set to Trust, nothing to check so just return ok
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -359,7 +359,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
/// Safekeeper http router.
|
||||
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let mut router = endpoint::make_router();
|
||||
if conf.auth.is_some() {
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
|
||||
@@ -375,7 +375,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
|
||||
// NB: on any changes do not forget to update the OpenAPI spec
|
||||
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
|
||||
let auth = conf.auth.clone();
|
||||
let auth = conf.http_auth.clone();
|
||||
router
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
|
||||
@@ -65,7 +65,9 @@ pub struct SafeKeeperConf {
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
pub backup_parallel_jobs: usize,
|
||||
pub wal_backup_enabled: bool,
|
||||
pub auth: Option<Arc<JwtAuth>>,
|
||||
pub pg_auth: Option<Arc<JwtAuth>>,
|
||||
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
|
||||
pub http_auth: Option<Arc<JwtAuth>>,
|
||||
pub current_thread_runtime: bool,
|
||||
}
|
||||
|
||||
@@ -99,7 +101,9 @@ impl SafeKeeperConf {
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
wal_backup_enabled: true,
|
||||
backup_parallel_jobs: 1,
|
||||
auth: None,
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
|
||||
@@ -16,10 +16,13 @@ use crate::SafeKeeperConf;
|
||||
use postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
/// allowed_auth_scope is either SafekeeperData (wide JWT tokens giving access
|
||||
/// to any tenant are allowed) or Tenant (only tokens giving access to specific
|
||||
/// tenant are allowed). Doesn't matter if auth is disabled in conf.
|
||||
pub async fn task_main(
|
||||
conf: SafeKeeperConf,
|
||||
pg_listener: std::net::TcpListener,
|
||||
allowed_auth_scope: Option<Scope>,
|
||||
allowed_auth_scope: Scope,
|
||||
) -> anyhow::Result<()> {
|
||||
// Tokio's from_std won't do this for us, per its comment.
|
||||
pg_listener.set_nonblocking(true)?;
|
||||
@@ -50,7 +53,7 @@ async fn handle_socket(
|
||||
socket: TcpStream,
|
||||
conf: SafeKeeperConf,
|
||||
conn_id: ConnectionId,
|
||||
allowed_auth_scope: Option<Scope>,
|
||||
allowed_auth_scope: Scope,
|
||||
) -> Result<(), QueryError> {
|
||||
socket.set_nodelay(true)?;
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
@@ -82,16 +85,17 @@ async fn handle_socket(
|
||||
},
|
||||
);
|
||||
|
||||
let auth_type = match conf.auth {
|
||||
let auth_key = match allowed_auth_scope {
|
||||
Scope::Tenant => conf.pg_tenant_only_auth.clone(),
|
||||
_ => conf.pg_auth.clone(),
|
||||
};
|
||||
let auth_type = match auth_key {
|
||||
None => AuthType::Trust,
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
};
|
||||
let mut conn_handler = SafekeeperPostgresHandler::new(
|
||||
conf,
|
||||
conn_id,
|
||||
Some(traffic_metrics.clone()),
|
||||
allowed_auth_scope,
|
||||
);
|
||||
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
|
||||
let mut conn_handler =
|
||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
|
||||
Reference in New Issue
Block a user