From f7b878611a8709954b0a61bc1656f34f2054c4ea Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Wed, 1 Jun 2022 23:55:22 +0300 Subject: [PATCH] Implement JWT authentication in Safekeeper HTTP API (#1753) * `control_plane` crate (used by `neon_local`) now parses an `auth_enabled` bool for each Safekeeper * If auth is enabled, a Safekeeper is passed a path to a public key via a new command line argument * Added TODO comments to other places needing auth --- control_plane/src/local_env.rs | 2 ++ control_plane/src/safekeeper.rs | 5 +++++ safekeeper/src/bin/safekeeper.rs | 32 +++++++++++++++++++++++++++-- safekeeper/src/http/routes.rs | 35 +++++++++++++++++++++++++++----- safekeeper/src/lib.rs | 2 ++ 5 files changed, 69 insertions(+), 7 deletions(-) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index f7bb890893..28541c2ece 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -177,6 +177,7 @@ pub struct SafekeeperConf { pub sync: bool, pub remote_storage: Option, pub backup_threads: Option, + pub auth_enabled: bool, } impl Default for SafekeeperConf { @@ -188,6 +189,7 @@ impl Default for SafekeeperConf { sync: true, remote_storage: None, backup_threads: None, + auth_enabled: false, } } } diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 972b6d48ae..c90f36d104 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -149,6 +149,11 @@ impl SafekeeperNode { if let Some(ref remote_storage) = self.conf.remote_storage { cmd.args(&["--remote-storage", remote_storage]); } + if self.conf.auth_enabled { + cmd.arg("--auth-validation-public-key-path"); + // PathBuf is better be passed as is, not via `String`. + cmd.arg(self.env.base_data_dir.join("auth_public_key.pem")); + } fill_aws_secrets_vars(&mut cmd); diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 5ce2591ff3..6c9c59c76b 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -10,6 +10,7 @@ use remote_storage::RemoteStorageConfig; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::thread; use tokio::sync::mpsc; use toml_edit::Document; @@ -27,6 +28,7 @@ use safekeeper::timeline::GlobalTimelines; use safekeeper::wal_backup; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; +use utils::auth::JwtAuth; use utils::{ http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener, zid::NodeId, @@ -132,6 +134,12 @@ fn main() -> anyhow::Result<()> { .default_missing_value("true") .help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."), ) + .arg( + Arg::new("auth-validation-public-key-path") + .long("auth-validation-public-key-path") + .takes_value(true) + .help("Path to an RSA .pem public key which is used to check JWT tokens") + ) .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { @@ -204,6 +212,10 @@ fn main() -> anyhow::Result<()> { .parse() .context("failed to parse bool enable-s3-offload bool")?; + conf.auth_validation_public_key_path = arg_matches + .value_of("auth-validation-public-key-path") + .map(PathBuf::from); + start_safekeeper(conf, given_id, arg_matches.is_present("init")) } @@ -239,6 +251,19 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo e })?; + let auth = match conf.auth_validation_public_key_path.as_ref() { + None => { + info!("Auth is disabled"); + None + } + Some(path) => { + info!("Loading JWT auth key from {}", path.display()); + Some(Arc::new( + JwtAuth::from_key_path(path).context("failed to load the auth key")?, + )) + } + }; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -280,8 +305,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("http_endpoint_thread".into()) .spawn(|| { - // TODO authentication - let router = http::make_router(conf_); + let router = http::make_router(conf_, auth); endpoint::serve_thread_main( router, http_listener, @@ -295,6 +319,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let safekeeper_thread = thread::Builder::new() .name("Safekeeper thread".into()) .spawn(|| { + // TODO: add auth if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener) { info!("safekeeper thread terminated: {e}"); } @@ -309,6 +334,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("broker thread".into()) .spawn(|| { + // TODO: add auth? broker::thread_main(conf_); })?, ); @@ -321,6 +347,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("WAL removal thread".into()) .spawn(|| { + // TODO: add auth? remove_wal::thread_main(conf_); })?, ); @@ -330,6 +357,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo thread::Builder::new() .name("wal backup launcher thread".into()) .spawn(move || { + // TODO: add auth? wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx); })?, ); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 73b9024c7d..ca43039d3b 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,8 +1,9 @@ -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Body, Request, Response, StatusCode, Uri}; +use once_cell::sync::Lazy; use serde::Serialize; use serde::Serializer; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::sync::Arc; @@ -12,8 +13,9 @@ use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult}; use crate::SafeKeeperConf; use etcd_broker::subscription_value::SkTimelineInfo; use utils::{ + auth::JwtAuth, http::{ - endpoint, + endpoint::{self, auth_middleware, check_permission}, error::ApiError, json::{json_request, json_response}, request::{ensure_no_body, parse_request_param}, @@ -32,6 +34,7 @@ struct SafekeeperStatus { /// Healthcheck handler. async fn status_handler(request: Request) -> Result, ApiError> { + check_permission(&request, None)?; let conf = get_conf(&request); let status = SafekeeperStatus { id: conf.my_id }; json_response(StatusCode::OK, status) @@ -91,6 +94,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result, ) -> Result, ApiError> { let tenant_id = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; ensure_no_body(&mut request).await?; json_response( StatusCode::OK, @@ -178,6 +185,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result) -> Result RouterBuilder { - let router = endpoint::make_router(); +pub fn make_router( + conf: SafeKeeperConf, + auth: Option>, +) -> RouterBuilder { + let mut router = endpoint::make_router(); + if auth.is_some() { + router = router.middleware(auth_middleware(|request| { + #[allow(clippy::mutable_key_type)] + static ALLOWLIST_ROUTES: Lazy> = + Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect()); + if ALLOWLIST_ROUTES.contains(request.uri()) { + None + } else { + // Option> is always provided as data below, hence unwrap(). + request.data::>>().unwrap().as_deref() + } + })) + } router .data(Arc::new(conf)) + .data(auth) .get("/v1/status", status_handler) .get( "/v1/timeline/:tenant_id/:timeline_id", diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f328d2e85a..0335d61d3f 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -57,6 +57,7 @@ pub struct SafeKeeperConf { pub my_id: NodeId, pub broker_endpoints: Vec, pub broker_etcd_prefix: String, + pub auth_validation_public_key_path: Option, } impl SafeKeeperConf { @@ -88,6 +89,7 @@ impl Default for SafeKeeperConf { broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, wal_backup_enabled: true, + auth_validation_public_key_path: None, } } }