mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
Implement JWT authentication in Safekeeper HTTP API (#1753)
* `control_plane` crate (used by `neon_local`) now parses an `auth_enabled` bool for each Safekeeper * If auth is enabled, a Safekeeper is passed a path to a public key via a new command line argument * Added TODO comments to other places needing auth
This commit is contained in:
committed by
Egor Suvorov
parent
a51b2dac9a
commit
f7b878611a
@@ -177,6 +177,7 @@ pub struct SafekeeperConf {
|
||||
pub sync: bool,
|
||||
pub remote_storage: Option<String>,
|
||||
pub backup_threads: Option<u32>,
|
||||
pub auth_enabled: bool,
|
||||
}
|
||||
|
||||
impl Default for SafekeeperConf {
|
||||
@@ -188,6 +189,7 @@ impl Default for SafekeeperConf {
|
||||
sync: true,
|
||||
remote_storage: None,
|
||||
backup_threads: None,
|
||||
auth_enabled: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,6 +149,11 @@ impl SafekeeperNode {
|
||||
if let Some(ref remote_storage) = self.conf.remote_storage {
|
||||
cmd.args(&["--remote-storage", remote_storage]);
|
||||
}
|
||||
if self.conf.auth_enabled {
|
||||
cmd.arg("--auth-validation-public-key-path");
|
||||
// PathBuf is better be passed as is, not via `String`.
|
||||
cmd.arg(self.env.base_data_dir.join("auth_public_key.pem"));
|
||||
}
|
||||
|
||||
fill_aws_secrets_vars(&mut cmd);
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use remote_storage::RemoteStorageConfig;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use tokio::sync::mpsc;
|
||||
use toml_edit::Document;
|
||||
@@ -27,6 +28,7 @@ use safekeeper::timeline::GlobalTimelines;
|
||||
use safekeeper::wal_backup;
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::SafeKeeperConf;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::{
|
||||
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
|
||||
zid::NodeId,
|
||||
@@ -132,6 +134,12 @@ fn main() -> anyhow::Result<()> {
|
||||
.default_missing_value("true")
|
||||
.help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("auth-validation-public-key-path")
|
||||
.long("auth-validation-public-key-path")
|
||||
.takes_value(true)
|
||||
.help("Path to an RSA .pem public key which is used to check JWT tokens")
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("dump-control-file") {
|
||||
@@ -204,6 +212,10 @@ fn main() -> anyhow::Result<()> {
|
||||
.parse()
|
||||
.context("failed to parse bool enable-s3-offload bool")?;
|
||||
|
||||
conf.auth_validation_public_key_path = arg_matches
|
||||
.value_of("auth-validation-public-key-path")
|
||||
.map(PathBuf::from);
|
||||
|
||||
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
|
||||
}
|
||||
|
||||
@@ -239,6 +251,19 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
e
|
||||
})?;
|
||||
|
||||
let auth = match conf.auth_validation_public_key_path.as_ref() {
|
||||
None => {
|
||||
info!("Auth is disabled");
|
||||
None
|
||||
}
|
||||
Some(path) => {
|
||||
info!("Loading JWT auth key from {}", path.display());
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// XXX: Don't spawn any threads before daemonizing!
|
||||
if conf.daemonize {
|
||||
info!("daemonizing...");
|
||||
@@ -280,8 +305,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
thread::Builder::new()
|
||||
.name("http_endpoint_thread".into())
|
||||
.spawn(|| {
|
||||
// TODO authentication
|
||||
let router = http::make_router(conf_);
|
||||
let router = http::make_router(conf_, auth);
|
||||
endpoint::serve_thread_main(
|
||||
router,
|
||||
http_listener,
|
||||
@@ -295,6 +319,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
let safekeeper_thread = thread::Builder::new()
|
||||
.name("Safekeeper thread".into())
|
||||
.spawn(|| {
|
||||
// TODO: add auth
|
||||
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) {
|
||||
info!("safekeeper thread terminated: {e}");
|
||||
}
|
||||
@@ -309,6 +334,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
thread::Builder::new()
|
||||
.name("broker thread".into())
|
||||
.spawn(|| {
|
||||
// TODO: add auth?
|
||||
broker::thread_main(conf_);
|
||||
})?,
|
||||
);
|
||||
@@ -321,6 +347,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
thread::Builder::new()
|
||||
.name("WAL removal thread".into())
|
||||
.spawn(|| {
|
||||
// TODO: add auth?
|
||||
remove_wal::thread_main(conf_);
|
||||
})?,
|
||||
);
|
||||
@@ -330,6 +357,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
thread::Builder::new()
|
||||
.name("wal backup launcher thread".into())
|
||||
.spawn(move || {
|
||||
// TODO: add auth?
|
||||
wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx);
|
||||
})?,
|
||||
);
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use hyper::{Body, Request, Response, StatusCode, Uri};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use serde::Serializer;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -12,8 +13,9 @@ use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
|
||||
use crate::SafeKeeperConf;
|
||||
use etcd_broker::subscription_value::SkTimelineInfo;
|
||||
use utils::{
|
||||
auth::JwtAuth,
|
||||
http::{
|
||||
endpoint,
|
||||
endpoint::{self, auth_middleware, check_permission},
|
||||
error::ApiError,
|
||||
json::{json_request, json_response},
|
||||
request::{ensure_no_body, parse_request_param},
|
||||
@@ -32,6 +34,7 @@ struct SafekeeperStatus {
|
||||
|
||||
/// Healthcheck handler.
|
||||
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
let conf = get_conf(&request);
|
||||
let status = SafekeeperStatus { id: conf.my_id };
|
||||
json_response(StatusCode::OK, status)
|
||||
@@ -91,6 +94,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
|
||||
let (inmem, state) = tli.get_state();
|
||||
@@ -125,6 +129,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
tenant_id: request_data.tenant_id,
|
||||
timeline_id: request_data.timeline_id,
|
||||
};
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
|
||||
.map_err(ApiError::from_err)?;
|
||||
|
||||
@@ -145,6 +150,7 @@ async fn timeline_delete_force_handler(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
@@ -160,6 +166,7 @@ async fn tenant_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
@@ -178,6 +185,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
|
||||
|
||||
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
|
||||
@@ -188,10 +196,27 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
}
|
||||
|
||||
/// Safekeeper http router.
|
||||
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let router = endpoint::make_router();
|
||||
pub fn make_router(
|
||||
conf: SafeKeeperConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let mut router = endpoint::make_router();
|
||||
if auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
|
||||
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
|
||||
if ALLOWLIST_ROUTES.contains(request.uri()) {
|
||||
None
|
||||
} else {
|
||||
// Option<Arc<JwtAuth>> is always provided as data below, hence unwrap().
|
||||
request.data::<Option<Arc<JwtAuth>>>().unwrap().as_deref()
|
||||
}
|
||||
}))
|
||||
}
|
||||
router
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/v1/status", status_handler)
|
||||
.get(
|
||||
"/v1/timeline/:tenant_id/:timeline_id",
|
||||
|
||||
@@ -57,6 +57,7 @@ pub struct SafeKeeperConf {
|
||||
pub my_id: NodeId,
|
||||
pub broker_endpoints: Vec<Url>,
|
||||
pub broker_etcd_prefix: String,
|
||||
pub auth_validation_public_key_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -88,6 +89,7 @@ impl Default for SafeKeeperConf {
|
||||
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
|
||||
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
|
||||
wal_backup_enabled: true,
|
||||
auth_validation_public_key_path: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user