mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
Enable JWT auth in Hadron API endpoints accepting untrusted connections (#179)
This commit is contained in:
committed by
Vlad Lazar
parent
30e1213141
commit
9661022e34
@@ -1023,6 +1023,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
// User (likely interactive) did not provide a description of the environment, give them the default
|
||||
NeonLocalInitConf {
|
||||
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
|
||||
auth_token_type: AuthType::NeonJWT,
|
||||
broker: NeonBroker {
|
||||
listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()),
|
||||
listen_https_addr: None,
|
||||
@@ -1579,7 +1580,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
assert!(!pageservers.is_empty());
|
||||
|
||||
let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
|
||||
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
|
||||
let auth_token = if matches!(
|
||||
ps_conf.pg_auth_type,
|
||||
AuthType::NeonJWT | AuthType::HadronJWT
|
||||
) {
|
||||
let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
|
||||
|
||||
Some(env.generate_auth_token(&claims)?)
|
||||
|
||||
@@ -18,7 +18,7 @@ use postgres_backend::AuthType;
|
||||
use reqwest::{Certificate, Url};
|
||||
use safekeeper_api::PgMajorVersion;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::auth::encode_from_key_file;
|
||||
use utils::auth::{encode_from_key_file, Claims, encode_hadron_token};
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::broker::StorageBroker;
|
||||
@@ -60,6 +60,9 @@ pub struct LocalEnv {
|
||||
// --tenant_id is not explicitly specified.
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
|
||||
// The type of tokens to use for authentication in the test environment. Determines
|
||||
// the type of key pairs and tokens generated in the test.
|
||||
pub token_auth_type: AuthType,
|
||||
// used to issue tokens during e.g pg start
|
||||
pub private_key_path: PathBuf,
|
||||
/// Path to environment's public key
|
||||
@@ -105,6 +108,7 @@ pub struct OnDiskConfig {
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
pub neon_distrib_dir: PathBuf,
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
pub token_auth_type: Option<AuthType>,
|
||||
pub private_key_path: PathBuf,
|
||||
pub public_key_path: PathBuf,
|
||||
pub broker: NeonBroker,
|
||||
@@ -153,6 +157,7 @@ pub struct NeonLocalInitConf {
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_hooks_api: Option<Url>,
|
||||
pub generate_local_ssl_certs: bool,
|
||||
pub auth_token_type: AuthType,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
@@ -374,7 +379,7 @@ pub struct SafekeeperConf {
|
||||
pub sync: bool,
|
||||
pub remote_storage: Option<String>,
|
||||
pub backup_threads: Option<u32>,
|
||||
pub auth_enabled: bool,
|
||||
pub auth_type: AuthType,
|
||||
pub listen_addr: Option<String>,
|
||||
}
|
||||
|
||||
@@ -389,7 +394,7 @@ impl Default for SafekeeperConf {
|
||||
sync: true,
|
||||
remote_storage: None,
|
||||
backup_threads: None,
|
||||
auth_enabled: false,
|
||||
auth_type: AuthType::Trust,
|
||||
listen_addr: None,
|
||||
}
|
||||
}
|
||||
@@ -663,6 +668,7 @@ impl LocalEnv {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
token_auth_type,
|
||||
private_key_path,
|
||||
public_key_path,
|
||||
broker,
|
||||
@@ -681,6 +687,7 @@ impl LocalEnv {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
token_auth_type: token_auth_type.unwrap_or(AuthType::NeonJWT),
|
||||
private_key_path,
|
||||
public_key_path,
|
||||
broker,
|
||||
@@ -796,6 +803,7 @@ impl LocalEnv {
|
||||
pg_distrib_dir: self.pg_distrib_dir.clone(),
|
||||
neon_distrib_dir: self.neon_distrib_dir.clone(),
|
||||
default_tenant_id: self.default_tenant_id,
|
||||
token_auth_type: Some(self.token_auth_type),
|
||||
private_key_path: self.private_key_path.clone(),
|
||||
public_key_path: self.public_key_path.clone(),
|
||||
broker: self.broker.clone(),
|
||||
@@ -825,8 +833,13 @@ impl LocalEnv {
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn generate_auth_token<S: Serialize>(&self, claims: &S) -> anyhow::Result<String> {
|
||||
let key = self.read_private_key()?;
|
||||
encode_from_key_file(claims, &key)
|
||||
let private_key_path = self.get_private_key_path();
|
||||
let key_data = fs::read(private_key_path)?;
|
||||
match self.token_auth_type {
|
||||
AuthType::NeonJWT => encode_from_key_file(claims, &key_data),
|
||||
AuthType::HadronJWT => encode_hadron_token(claims, &key_data),
|
||||
_ => panic!("unsupported token auth type {:?}", self.token_auth_type),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the path to the private key.
|
||||
@@ -915,6 +928,7 @@ impl LocalEnv {
|
||||
generate_local_ssl_certs,
|
||||
control_plane_hooks_api,
|
||||
endpoint_storage,
|
||||
auth_token_type,
|
||||
} = conf;
|
||||
|
||||
// Find postgres binaries.
|
||||
@@ -943,6 +957,7 @@ impl LocalEnv {
|
||||
generate_auth_keys(
|
||||
base_path.join("auth_private_key.pem").as_path(),
|
||||
base_path.join("auth_public_key.pem").as_path(),
|
||||
auth_token_type,
|
||||
)
|
||||
.context("generate auth keys")?;
|
||||
let private_key_path = PathBuf::from("auth_private_key.pem");
|
||||
@@ -956,6 +971,7 @@ impl LocalEnv {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id: Some(default_tenant_id),
|
||||
token_auth_type: auth_token_type,
|
||||
private_key_path,
|
||||
public_key_path,
|
||||
broker,
|
||||
@@ -1035,39 +1051,63 @@ pub fn base_path() -> PathBuf {
|
||||
}
|
||||
|
||||
/// Generate a public/private key pair for JWT authentication
|
||||
fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
|
||||
// Generate the key pair
|
||||
//
|
||||
// openssl genpkey -algorithm ed25519 -out auth_private_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("genpkey")
|
||||
.args(["-algorithm", "ed25519"])
|
||||
.args(["-out", private_key_path.to_str().unwrap()])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.context("failed to generate auth private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
// Extract the public key from the private key file
|
||||
//
|
||||
// openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("pkey")
|
||||
.args(["-in", private_key_path.to_str().unwrap()])
|
||||
.arg("-pubout")
|
||||
.args(["-out", public_key_path.to_str().unwrap()])
|
||||
.output()
|
||||
.context("failed to extract public key from private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
fn generate_auth_keys(
|
||||
private_key_path: &Path,
|
||||
public_key_path: &Path,
|
||||
auth_type: AuthType,
|
||||
) -> anyhow::Result<()> {
|
||||
if auth_type == AuthType::NeonJWT {
|
||||
// Generate the key pair
|
||||
//
|
||||
// openssl genpkey -algorithm ed25519 -out auth_private_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("genpkey")
|
||||
.args(["-algorithm", "ed25519"])
|
||||
.args(["-out", private_key_path.to_str().unwrap()])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.context("failed to generate auth private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
// Extract the public key from the private key file
|
||||
//
|
||||
// openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("pkey")
|
||||
.args(["-in", private_key_path.to_str().unwrap()])
|
||||
.arg("-pubout")
|
||||
.args(["-out", public_key_path.to_str().unwrap()])
|
||||
.output()
|
||||
.context("failed to extract public key from private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
} else if auth_type == AuthType::HadronJWT {
|
||||
// Generate the RSA key pair. Note that the public key is embedded in an X509 certificate.
|
||||
//
|
||||
// openssl req -x509 -newkey rsa:4096 -keyout auth_private_key.pem -out auth_public_key.pem -nodes -subj "/CN=eng-brickstore@databricks.com"
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("req")
|
||||
.args(["-x509", "-newkey", "rsa:4096", "-sha256"])
|
||||
.args(["-keyout", private_key_path.to_str().unwrap()])
|
||||
.args(["-out", public_key_path.to_str().unwrap()])
|
||||
.args(["-nodes"])
|
||||
.args(["-subj", "/CN=eng-brickstore@databricks.com"])
|
||||
.output()
|
||||
.context("Failed to generate RSA key pair for Hadron token auth")?;
|
||||
if !keygen_output.status.success() {
|
||||
bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -73,7 +73,7 @@ impl PageServerNode {
|
||||
{
|
||||
match conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(
|
||||
AuthType::NeonJWT | AuthType::HadronJWT => Some(
|
||||
env.generate_auth_token(&Claims::new(None, Scope::PageServerApi))
|
||||
.unwrap(),
|
||||
),
|
||||
@@ -117,6 +117,9 @@ impl PageServerNode {
|
||||
|
||||
// Storage controller uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
// Note: In Hadron the "control plane" is HCC. HCC does not require a token on the trusted port PS connects
|
||||
// to, so we do not need to set any tokens when using HadronJWT. In the future we may consider using mTLS
|
||||
// instead of JWT for HTTP auth.
|
||||
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
.env
|
||||
|
||||
@@ -13,12 +13,15 @@ use std::{io, result};
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use thiserror::Error;
|
||||
use utils::auth::{Claims, Scope};
|
||||
use utils::id::NodeId;
|
||||
use utils::ip_address::HADRON_NODE_IP_ADDRESS;
|
||||
use utils::{http::error::HttpErrorBody, id::NodeId};
|
||||
|
||||
use crate::background_process;
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
@@ -156,7 +159,7 @@ impl SafekeeperNode {
|
||||
"--id".to_owned(),
|
||||
id_string,
|
||||
"--listen-pg".to_owned(),
|
||||
listen_pg,
|
||||
listen_pg.clone(),
|
||||
"--listen-http".to_owned(),
|
||||
listen_http,
|
||||
"--availability-zone".to_owned(),
|
||||
@@ -186,7 +189,11 @@ impl SafekeeperNode {
|
||||
}
|
||||
|
||||
let key_path = self.env.base_data_dir.join("auth_public_key.pem");
|
||||
if self.conf.auth_enabled {
|
||||
if self.conf.auth_type != AuthType::Trust {
|
||||
args.extend([
|
||||
"--token-auth-type".to_owned(),
|
||||
self.conf.auth_type.to_string(),
|
||||
]);
|
||||
let key_path_string = key_path
|
||||
.to_str()
|
||||
.with_context(|| {
|
||||
|
||||
@@ -36,8 +36,8 @@ use whoami::username;
|
||||
|
||||
pub struct StorageController {
|
||||
env: LocalEnv,
|
||||
private_key: Option<Pem>,
|
||||
public_key: Option<Pem>,
|
||||
private_key: Option<StorageControllerPrivateKey>,
|
||||
public_key: Option<StorageControllerPublicKey>,
|
||||
client: reqwest::Client,
|
||||
config: NeonStorageControllerConf,
|
||||
|
||||
@@ -106,6 +106,25 @@ pub struct InspectResponse {
|
||||
pub attachment: Option<(u32, NodeId)>,
|
||||
}
|
||||
|
||||
enum StorageControllerPublicKey {
|
||||
RawPublicKey(Pem),
|
||||
PublicKeyCertPath(Utf8PathBuf),
|
||||
}
|
||||
|
||||
enum StorageControllerPrivateKey {
|
||||
EdPrivateKey(Pem),
|
||||
HadronPrivateKey(Utf8PathBuf, Vec<u8>),
|
||||
}
|
||||
|
||||
impl StorageControllerPrivateKey {
|
||||
pub fn encode_token(&self, claims: &Claims) -> anyhow::Result<String> {
|
||||
match self {
|
||||
Self::EdPrivateKey(key_data) => encode_from_key_file(claims, key_data),
|
||||
Self::HadronPrivateKey(_, key_data) => encode_hadron_token(claims, key_data),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageController {
|
||||
pub fn from_env(env: &LocalEnv) -> Self {
|
||||
// Assume all pageservers have symmetric auth configuration: this service
|
||||
@@ -150,7 +169,30 @@ impl StorageController {
|
||||
)
|
||||
.expect("Failed to parse PEM file")
|
||||
};
|
||||
(Some(private_key), Some(public_key))
|
||||
(
|
||||
Some(StorageControllerPrivateKey::EdPrivateKey(private_key)),
|
||||
Some(StorageControllerPublicKey::RawPublicKey(public_key)),
|
||||
)
|
||||
}
|
||||
AuthType::HadronJWT => {
|
||||
let private_key_path = env.get_private_key_path();
|
||||
let private_key =
|
||||
fs::read(private_key_path.clone()).expect("failed to read private key");
|
||||
|
||||
// If pageserver auth is enabled, this implicitly enables auth for this service,
|
||||
// using the same credentials.
|
||||
let public_key_path =
|
||||
camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
|
||||
.unwrap();
|
||||
(
|
||||
Some(StorageControllerPrivateKey::HadronPrivateKey(
|
||||
camino::Utf8PathBuf::try_from(private_key_path).unwrap(),
|
||||
private_key,
|
||||
)),
|
||||
Some(StorageControllerPublicKey::PublicKeyCertPath(
|
||||
public_key_path,
|
||||
)),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -573,8 +615,14 @@ impl StorageController {
|
||||
|
||||
if let Some(private_key) = &self.private_key {
|
||||
let claims = Claims::new(None, Scope::PageServerApi);
|
||||
let jwt_token =
|
||||
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
|
||||
if let StorageControllerPrivateKey::HadronPrivateKey(key_path, _) = private_key {
|
||||
args.push(format!("--private-key-path={key_path}"));
|
||||
}
|
||||
// We are setting --jwt-token for Hadron as well in this test to avoid bifurcation between Neon and
|
||||
// Hadron test cases. In production we do not need to set this as HTTP auth is not enabled on the
|
||||
// pageserver. We use network segmentation to ensure that only trusted components can talk to
|
||||
// pageserver's http port
|
||||
let jwt_token = private_key.encode_token(&claims)?;
|
||||
args.push(format!("--jwt-token={jwt_token}"));
|
||||
|
||||
let peer_claims = Claims::new(None, Scope::Admin);
|
||||
@@ -589,7 +637,14 @@ impl StorageController {
|
||||
}
|
||||
|
||||
if let Some(public_key) = &self.public_key {
|
||||
args.push(format!("--public-key=\"{public_key}\""));
|
||||
match public_key {
|
||||
StorageControllerPublicKey::RawPublicKey(public_key) => {
|
||||
args.push(format!("--public-key=\"{public_key}\""));
|
||||
}
|
||||
StorageControllerPublicKey::PublicKeyCertPath(public_key_path) => {
|
||||
args.push(format!("--public-key-cert-path={public_key_path}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(control_plane_hooks_api) = &self.env.control_plane_hooks_api {
|
||||
@@ -840,8 +895,8 @@ impl StorageController {
|
||||
if let Some(private_key) = &self.private_key {
|
||||
println!("Getting claims for path {path}");
|
||||
if let Some(required_claims) = Self::get_claims_for_path(&path)? {
|
||||
println!("Got claims {required_claims:?} for path {path}");
|
||||
let jwt_token = encode_from_key_file(&required_claims, private_key)?;
|
||||
println!("Got claims {:?} for path {}", required_claims, path);
|
||||
let jwt_token = private_key.encode_token(&required_claims)?;
|
||||
builder = builder.header(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
format!("Bearer {jwt_token}"),
|
||||
|
||||
@@ -705,8 +705,10 @@ pub fn check_permission_with(
|
||||
check_permission: impl Fn(&Claims) -> Result<(), AuthError>,
|
||||
) -> Result<(), ApiError> {
|
||||
match req.context::<Claims>() {
|
||||
Some(claims) => Ok(check_permission(&claims)
|
||||
.map_err(|_err| ApiError::Forbidden("JWT authentication error".to_string()))?),
|
||||
Some(claims) => Ok(check_permission(&claims).map_err(|err| {
|
||||
tracing::info!("Authorization error: {err}");
|
||||
ApiError::Forbidden("JWT authentication error".to_string())
|
||||
})?),
|
||||
None => Ok(()), // claims is None because auth is disabled
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,6 +194,10 @@ pub enum AuthType {
|
||||
Trust,
|
||||
// This mimics postgres's AuthenticationCleartextPassword but instead of password expects JWT
|
||||
NeonJWT,
|
||||
// Similar to above but uses Hadron JWT. Hadron JWTs are slightly different in that:
|
||||
// 1. Decoding keys are loaded from PEM-encoded X509 certificates instead of plain key files.
|
||||
// 2. Signature algorithm is RSA-based (may change in the future).
|
||||
HadronJWT,
|
||||
}
|
||||
|
||||
impl FromStr for AuthType {
|
||||
@@ -203,6 +207,7 @@ impl FromStr for AuthType {
|
||||
match s {
|
||||
"Trust" => Ok(Self::Trust),
|
||||
"NeonJWT" => Ok(Self::NeonJWT),
|
||||
"HadronJWT" => Ok(Self::HadronJWT),
|
||||
_ => anyhow::bail!("invalid value \"{s}\" for auth type"),
|
||||
}
|
||||
}
|
||||
@@ -213,6 +218,7 @@ impl fmt::Display for AuthType {
|
||||
f.write_str(match self {
|
||||
AuthType::Trust => "Trust",
|
||||
AuthType::NeonJWT => "NeonJWT",
|
||||
AuthType::HadronJWT => "HadronJWT",
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -613,7 +619,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
if self.state == ProtoState::Authentication {
|
||||
match self.framed.read_message().await? {
|
||||
Some(FeMessage::PasswordMessage(m)) => {
|
||||
assert!(self.auth_type == AuthType::NeonJWT);
|
||||
assert!(matches!(
|
||||
self.auth_type,
|
||||
AuthType::NeonJWT | AuthType::HadronJWT
|
||||
));
|
||||
|
||||
let (_, jwt_response) = m.split_last().context("protocol violation")?;
|
||||
|
||||
@@ -712,7 +721,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
.await?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
AuthType::NeonJWT => {
|
||||
AuthType::NeonJWT | AuthType::HadronJWT => {
|
||||
self.write_message(&BeMessage::AuthenticationCleartextPassword)
|
||||
.await?;
|
||||
self.state = ProtoState::Authentication;
|
||||
|
||||
@@ -94,6 +94,13 @@ impl Claims {
|
||||
scope,
|
||||
endpoint_id: None,
|
||||
}
|
||||
|
||||
pub fn new_for_endpoint(endpoint_id: Uuid) -> Self {
|
||||
Self {
|
||||
tenant_id: None,
|
||||
endpoint_id: Some(endpoint_id),
|
||||
scope: Scope::TenantEndpoint,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,6 +224,25 @@ pub fn encode_from_key_file<S: Serialize>(claims: &S, pem: &Pem) -> Result<Strin
|
||||
Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?)
|
||||
}
|
||||
|
||||
/// Encode (i.e., sign) a Hadron auth token with the given claims and RSA private key. This is used
|
||||
/// by HCC to sign tokens when deploying compute or returning the compute spec. The resulting token
|
||||
/// is used by the compute node to authenticate with HCC and PS/SK.
|
||||
pub fn encode_hadron_token(claims: &Claims, key_data: &[u8]) -> Result<String> {
|
||||
let key = EncodingKey::from_rsa_pem(key_data)?;
|
||||
encode_hadron_token_with_encoding_key(claims, &key)
|
||||
}
|
||||
|
||||
pub fn encode_hadron_token_with_encoding_key(
|
||||
claims: &Claims,
|
||||
encoding_key: &EncodingKey,
|
||||
) -> Result<String> {
|
||||
Ok(encode(
|
||||
&Header::new(HADRON_STORAGE_TOKEN_ALGORITHM),
|
||||
claims,
|
||||
encoding_key,
|
||||
)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
@@ -243,6 +269,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
fn test_decode() {
|
||||
let expected_claims = Claims {
|
||||
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()),
|
||||
endpoint_id: None,
|
||||
scope: Scope::Tenant,
|
||||
endpoint_id: None,
|
||||
};
|
||||
@@ -272,6 +299,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
fn test_encode() {
|
||||
let claims = Claims {
|
||||
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()),
|
||||
endpoint_id: None,
|
||||
scope: Scope::Tenant,
|
||||
endpoint_id: None,
|
||||
};
|
||||
@@ -287,4 +315,69 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
|
||||
assert_eq!(decoded.claims, claims);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_with_key_from_certificate() {
|
||||
// Tests that we can sign (encode) a token with a RSA private key and verify (decode) it with the
|
||||
// corresponding public key extracted from a certificate.
|
||||
|
||||
// Generate two RSA key pairs and create self-signed certificates with it.
|
||||
let key_pair_1 = rcgen::KeyPair::generate_for(&rcgen::PKCS_RSA_SHA256).unwrap();
|
||||
let key_pair_2 = rcgen::KeyPair::generate_for(&rcgen::PKCS_RSA_SHA256).unwrap();
|
||||
let mut params = rcgen::CertificateParams::default();
|
||||
params
|
||||
.distinguished_name
|
||||
.push(rcgen::DnType::CommonName, "eng-brickstore@databricks.com");
|
||||
let cert_1 = params.clone().self_signed(&key_pair_1).unwrap();
|
||||
let cert_2 = params.self_signed(&key_pair_2).unwrap();
|
||||
|
||||
// Write the certificates and keys to a temporary dir.
|
||||
let dir = camino_tempfile::tempdir().unwrap();
|
||||
{
|
||||
fs::File::create(dir.path().join("cert_1.pem"))
|
||||
.unwrap()
|
||||
.write_all(cert_1.pem().as_bytes())
|
||||
.unwrap();
|
||||
fs::File::create(dir.path().join("key_1.pem"))
|
||||
.unwrap()
|
||||
.write_all(key_pair_1.serialize_pem().as_bytes())
|
||||
.unwrap();
|
||||
fs::File::create(dir.path().join("cert_2.pem"))
|
||||
.unwrap()
|
||||
.write_all(cert_2.pem().as_bytes())
|
||||
.unwrap();
|
||||
fs::File::create(dir.path().join("key_2.pem"))
|
||||
.unwrap()
|
||||
.write_all(key_pair_2.serialize_pem().as_bytes())
|
||||
.unwrap();
|
||||
}
|
||||
// Instantiate a `JwtAuth` with the certificate path. The resulting `JwtAuth` should extract the RSA public
|
||||
// keys out of the X509 certificates and use them as the decoding keys. Since we specified a directory, both
|
||||
// X509 certificates will be loaded, but the private key files are skipped.
|
||||
let auth = JwtAuth::from_cert_path(dir.path()).unwrap();
|
||||
assert_eq!(auth.decoding_keys.len(), 2);
|
||||
|
||||
// Also create a `JwtAuth`, specifying a single certificate file for it to get the decoding key from.
|
||||
let auth_cert_1 = JwtAuth::from_cert_path(&dir.path().join("cert_1.pem")).unwrap();
|
||||
assert_eq!(auth_cert_1.decoding_keys.len(), 1);
|
||||
|
||||
// Encode tokens with some claims.
|
||||
let claims = Claims {
|
||||
tenant_id: Some(TenantId::generate()),
|
||||
endpoint_id: None,
|
||||
scope: Scope::Tenant,
|
||||
};
|
||||
let encoded_1 =
|
||||
encode_hadron_token(&claims, key_pair_1.serialize_pem().as_bytes()).unwrap();
|
||||
let encoded_2 =
|
||||
encode_hadron_token(&claims, key_pair_2.serialize_pem().as_bytes()).unwrap();
|
||||
|
||||
// Verify that we can decode the token with matching decoding keys (decoding also verifies the signature).
|
||||
assert_eq!(auth.decode(&encoded_1).unwrap().claims, claims);
|
||||
assert_eq!(auth.decode(&encoded_2).unwrap().claims, claims);
|
||||
assert_eq!(auth_cert_1.decode(&encoded_1).unwrap().claims, claims);
|
||||
|
||||
// Verify that the token cannot be decoded with a mismatched decode key.
|
||||
assert!(auth_cert_1.decode(&encoded_2).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -459,17 +459,25 @@ fn start_pageserver(
|
||||
let http_auth;
|
||||
let pg_auth;
|
||||
let grpc_auth;
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) {
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].iter().any(|auth_type| auth_type == AuthType::NeonJWT || auth_type == HadronJWT) {
|
||||
// unwrap is ok because check is performed when creating config, so path is set and exists
|
||||
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
|
||||
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
|
||||
|
||||
let jwt_auth = JwtAuth::from_key_path(key_path)?;
|
||||
let use_hadron_jwt =
|
||||
conf.http_auth_type == AuthType::HadronJWT || conf.pg_auth_type == AuthType::HadronJWT;
|
||||
|
||||
let jwt_auth = if use_hadron_jwt {
|
||||
// To validate Hadron JWTs we need to extract decoding keys from X509 certificates.
|
||||
JwtAuth::from_cert_path(key_path)?
|
||||
} else {
|
||||
JwtAuth::from_key_path(key_path)?
|
||||
};
|
||||
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
|
||||
|
||||
http_auth = match conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
AuthType::NeonJWT | AuthType::HadronJWT => Some(auth.clone()),
|
||||
};
|
||||
pg_auth = match conf.pg_auth_type {
|
||||
AuthType::Trust => None,
|
||||
@@ -477,7 +485,7 @@ fn start_pageserver(
|
||||
};
|
||||
grpc_auth = match conf.grpc_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth),
|
||||
AuthType::NeonJWT | AuthType::HadronJWT => Some(auth),
|
||||
};
|
||||
} else {
|
||||
http_auth = None;
|
||||
|
||||
@@ -50,6 +50,7 @@ use scopeguard::defer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tenant_size_model::svg::SvgBranchKind;
|
||||
use postgres_backend::AuthType;
|
||||
use tenant_size_model::{SizeResult, StorageModel};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::io::StreamReader;
|
||||
@@ -557,6 +558,10 @@ async fn reload_auth_validation_keys_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
// Note to Bricksters: This API returns 400 if HTTP auth is not enabled. This is because `state.auth` is only
|
||||
// determined by HTTP auth.
|
||||
// TODO(william.huang): In practice both HTTP and PG auth point to the same SwappableJwtAuth object. Refactor
|
||||
// this code so that we can swap out the underlying shared auth object even if HTTP auth is None.
|
||||
check_permission(&request, None)?;
|
||||
let config = get_config(&request);
|
||||
let state = get_state(&request);
|
||||
@@ -567,7 +572,12 @@ async fn reload_auth_validation_keys_handler(
|
||||
let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
|
||||
info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
|
||||
|
||||
match utils::auth::JwtAuth::from_key_path(key_path) {
|
||||
let new_jwt_auth = if config.http_auth_type == AuthType::HadronJWT {
|
||||
JwtAuth::from_cert_path(key_path)
|
||||
} else {
|
||||
JwtAuth::from_key_path(key_path)
|
||||
};
|
||||
match new_jwt_auth {
|
||||
Ok(new_auth) => {
|
||||
shared_auth.swap(new_auth);
|
||||
json_response(StatusCode::OK, ())
|
||||
|
||||
@@ -14,6 +14,7 @@ use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use postgres_backend::AuthType;
|
||||
use metrics::set_build_info_metric;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use safekeeper::defaults::{
|
||||
@@ -109,10 +110,15 @@ struct Args {
|
||||
/// Listen https endpoint for management and metrics in the form host:port.
|
||||
#[arg(long, default_value = None)]
|
||||
listen_https: Option<String>,
|
||||
/// Advertised endpoint for receiving/sending WAL in the form host:port. If not
|
||||
/// Advertised endpoint to PS for receiving/sending WAL in the form host:port. If not
|
||||
/// specified, listen_pg is used to advertise instead.
|
||||
#[arg(long, default_value = None)]
|
||||
advertise_pg: Option<String>,
|
||||
/// Advertised endpoint to compute for receiving/sending WAL in the form host:port.
|
||||
/// Required if --hcc-base-url is specified.
|
||||
// TODO(vlad): pull in hcc-base-url too
|
||||
#[arg(long, default_value = None)]
|
||||
advertise_pg_tenant_only: Option<String>,
|
||||
/// Availability zone of the safekeeper.
|
||||
#[arg(long)]
|
||||
availability_zone: Option<String>,
|
||||
@@ -164,6 +170,12 @@ struct Args {
|
||||
/// WAL backup horizon.
|
||||
#[arg(long)]
|
||||
disable_wal_backup: bool,
|
||||
/// Token authentication type. Allowed values are "NeonJWT" and "HadronJWT". Any specified value only takes effect if
|
||||
/// --pg-auth-public-key-path, --pg-tenant-only-auth-public-key-path, or --http-auth-public-key-path is specified.
|
||||
/// NeonJWT: Decoding keys are loaded from plain public key files in the specified key path.
|
||||
/// HadronJWT: Decoding keys are loaded from X509 certificates in the specified key path.
|
||||
#[arg(long, verbatim_doc_comment, default_value = "NeonJWT")]
|
||||
token_auth_type: AuthType,
|
||||
/// If given, enables auth on incoming connections to WAL service endpoint
|
||||
/// (--listen-pg). Value specifies path to a .pem public key used for
|
||||
/// validations of JWT tokens. Empty string is allowed and means disabling
|
||||
@@ -361,9 +373,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
Some(path) => {
|
||||
info!("loading pg auth JWT key from {path}");
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
match args.token_auth_type {
|
||||
AuthType::NeonJWT => {
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
}
|
||||
AuthType::HadronJWT => {
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_cert_path(path).context("failed to load auth keys from certificates")?,
|
||||
))
|
||||
}
|
||||
_ => panic!("AuthType {auth_type} is not allowed when --pg-auth-public-key-path is specified", auth_type = args.token_auth_type),
|
||||
}
|
||||
}
|
||||
};
|
||||
let pg_tenant_only_auth = match args.pg_tenant_only_auth_public_key_path.as_ref() {
|
||||
@@ -373,9 +395,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
Some(path) => {
|
||||
info!("loading pg tenant only auth JWT key from {path}");
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
match args.token_auth_type {
|
||||
AuthType::NeonJWT => {
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
}
|
||||
AuthType::HadronJWT => {
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_cert_path(path).context("failed to load auth keys from certificates")?,
|
||||
))
|
||||
}
|
||||
_ => panic!("AuthType {auth_type} is not allowed when --pg-tenant-only-auth-public-key-path is specified", auth_type = args.token_auth_type),
|
||||
}
|
||||
}
|
||||
};
|
||||
let http_auth = match args.http_auth_public_key_path.as_ref() {
|
||||
@@ -385,7 +417,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
Some(path) => {
|
||||
info!("loading http auth JWT key(s) from {path}");
|
||||
let jwt_auth = JwtAuth::from_key_path(path).context("failed to load the auth key")?;
|
||||
let jwt_auth = match args.token_auth_type {
|
||||
AuthType::NeonJWT => JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
AuthType::HadronJWT => JwtAuth::from_cert_path(path).context("failed to load auth keys from certificates")?,
|
||||
_ => panic!("AuthType {auth_type} is not allowed when --http-auth-public-key-path is specified", auth_type = args.token_auth_type),
|
||||
};
|
||||
Some(Arc::new(SwappableJwtAuth::new(jwt_auth)))
|
||||
}
|
||||
};
|
||||
@@ -420,6 +456,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
listen_http_addr: args.listen_http,
|
||||
listen_https_addr: args.listen_https,
|
||||
advertise_pg_addr: args.advertise_pg,
|
||||
advertise_pg_addr_tenant_only: args.advertise_pg_tenant_only,
|
||||
availability_zone: args.availability_zone,
|
||||
no_sync: args.no_sync,
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
@@ -434,6 +471,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
/* END_HADRON */
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
backup_parallel_jobs: args.wal_backup_parallel_jobs,
|
||||
auth_type: args.token_auth_type,
|
||||
pg_auth,
|
||||
pg_tenant_only_auth,
|
||||
http_auth,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
extern crate hyper0 as hyper;
|
||||
use postgres_backend::AuthType;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -105,6 +106,7 @@ pub struct SafeKeeperConf {
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub advertise_pg_addr: Option<String>,
|
||||
pub advertise_pg_addr_tenant_only: Option<String>,
|
||||
pub availability_zone: Option<String>,
|
||||
pub no_sync: bool,
|
||||
/* BEGIN_HADRON */
|
||||
@@ -128,6 +130,7 @@ pub struct SafeKeeperConf {
|
||||
/* END_HADRON */
|
||||
pub backup_parallel_jobs: usize,
|
||||
pub wal_backup_enabled: bool,
|
||||
pub auth_type: AuthType,
|
||||
pub pg_auth: Option<Arc<JwtAuth>>,
|
||||
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
|
||||
pub http_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
@@ -163,6 +166,7 @@ impl SafeKeeperConf {
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
listen_https_addr: None,
|
||||
advertise_pg_addr: None,
|
||||
advertise_pg_addr_tenant_only: None,
|
||||
availability_zone: None,
|
||||
remote_storage: None,
|
||||
my_id: NodeId(0),
|
||||
@@ -173,6 +177,7 @@ impl SafeKeeperConf {
|
||||
peer_recovery_enabled: true,
|
||||
wal_backup_enabled: true,
|
||||
backup_parallel_jobs: 1,
|
||||
auth_type: AuthType::HadronJWT,
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
|
||||
@@ -103,7 +103,7 @@ async fn handle_socket(
|
||||
};
|
||||
let auth_type = match auth_key {
|
||||
None => AuthType::Trust,
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
Some(_) => conf.auth_type,
|
||||
};
|
||||
let auth_pair = auth_key.map(|key| (allowed_auth_scope, key));
|
||||
let mut conn_handler = SafekeeperPostgresHandler::new(
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::time::Duration;
|
||||
use anyhow::{Result, bail};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_backend::AuthType;
|
||||
use desim::executor::{self, PollSome};
|
||||
use desim::network::TCP;
|
||||
use desim::node_os::NodeOs;
|
||||
@@ -166,9 +167,11 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
wal_backup_enabled: false,
|
||||
listen_pg_addr_tenant_only: None,
|
||||
advertise_pg_addr: None,
|
||||
advertise_pg_addr_tenant_only: None,
|
||||
availability_zone: None,
|
||||
peer_recovery_enabled: false,
|
||||
backup_parallel_jobs: 0,
|
||||
auth_type: AuthType::NeonJWT,
|
||||
pg_auth: None,
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
|
||||
@@ -31,6 +31,7 @@ humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
@@ -74,4 +75,4 @@ http-utils = { path = "../libs/http-utils/" }
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -9,7 +9,6 @@ pub fn check_permission(claims: &Claims, required_scope: Scope) -> Result<(), Au
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn check_endpoint_permission(claims: &Claims, endpoint_id: Uuid) -> Result<(), AuthError> {
|
||||
if claims.scope != Scope::TenantEndpoint {
|
||||
return Err(AuthError("Scope mismatch. Permission denied".into()));
|
||||
|
||||
51
storage_controller/src/hadron_token.rs
Normal file
51
storage_controller/src/hadron_token.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use anyhow::Result;
|
||||
use camino::Utf8Path;
|
||||
use jsonwebtoken::EncodingKey;
|
||||
use std::fs;
|
||||
use utils::{
|
||||
auth::{encode_hadron_token_with_encoding_key, Claims, Scope},
|
||||
id::TenantId,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct HadronTokenGenerator {
|
||||
encoding_key: EncodingKey,
|
||||
}
|
||||
|
||||
impl HadronTokenGenerator {
|
||||
pub fn new(path: &Utf8Path) -> Self {
|
||||
let key_data = fs::read(path).unwrap_or_else(|e| {
|
||||
panic!("Error reading private key file {:?}. Error: {:?}", path, e)
|
||||
});
|
||||
let encoding_key = EncodingKey::from_rsa_pem(&key_data).unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Error reading private key file {:?} as RSA private key. Error: {:?}",
|
||||
path, e
|
||||
)
|
||||
});
|
||||
Self { encoding_key }
|
||||
}
|
||||
|
||||
pub fn generate_tenant_scope_token(&self, tenant_id: TenantId) -> Result<String> {
|
||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||
self.internal_encode_token(&claims)
|
||||
}
|
||||
|
||||
pub fn generate_tenant_endpoint_scope_token(&self, endpoint_id: Uuid) -> Result<String> {
|
||||
let claims = Claims::new_for_endpoint(endpoint_id);
|
||||
self.internal_encode_token(&claims)
|
||||
}
|
||||
|
||||
pub fn generate_ps_sk_auth_token(&self) -> Result<String> {
|
||||
let claims = Claims {
|
||||
tenant_id: None,
|
||||
endpoint_id: None,
|
||||
scope: Scope::SafekeeperData,
|
||||
};
|
||||
self.internal_encode_token(&claims)
|
||||
}
|
||||
|
||||
fn internal_encode_token(&self, claims: &Claims) -> Result<String> {
|
||||
encode_hadron_token_with_encoding_key(claims, &self.encoding_key)
|
||||
}
|
||||
}
|
||||
@@ -1801,6 +1801,22 @@ fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(
|
||||
}
|
||||
})
|
||||
}
|
||||
/// Similar to `check_permissions()` above, but checks for TenantEndpoint scope specifically. Used by the compute spec-fetch API.
|
||||
/// Access by Admin-scope tokens is also permitted.
|
||||
/// TODO(william.huang): Merge with the previous function by refactoring `Scope` to make it carry the dependent arguments.
|
||||
/// E.g., `Scope::TenantEndpoint(EndpointId)`, `Scope::Tenant(TenantId)`, etc.
|
||||
fn check_endpoint_permission(request: &Request<Body>, endpoint_id: Uuid) -> Result<(), ApiError> {
|
||||
check_permission_with(
|
||||
request,
|
||||
|claims| match crate::auth::check_endpoint_permission(claims, endpoint_id) {
|
||||
Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(_) => Err(e),
|
||||
},
|
||||
Ok(()) => Ok(()),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct RequestMeta {
|
||||
|
||||
@@ -6,6 +6,7 @@ extern crate hyper0 as hyper;
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
pub mod hadron_token;
|
||||
pub mod hadron_utils;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
|
||||
@@ -14,6 +14,7 @@ use metrics::BuildInfo;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_controller::hadron_token::HadronTokenGenerator;
|
||||
use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
@@ -70,10 +71,26 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
listen_https: Option<std::net::SocketAddr>,
|
||||
|
||||
/// Public key for JWT authentication of clients
|
||||
/// PEM-encoded public key string for JWT authentication of clients.
|
||||
#[arg(long)]
|
||||
public_key: Option<String>,
|
||||
|
||||
/// Path to public key certificates used for JWT authentiation of clients.
|
||||
/// Only one of `public_key` and `public_key_cert_path` should be set.
|
||||
/// `public_key` or `public_key_cert_path` can point to either a file or a directory.
|
||||
/// When pointed to a directory, public keys in all files in the first level of
|
||||
/// the directory (i.e., no subdirectories) will be loaded.
|
||||
#[arg(long)]
|
||||
public_key_cert_path: Option<Utf8PathBuf>,
|
||||
|
||||
/// Path to the file containing the private key used to generate JWTs for client
|
||||
/// authentication. The file should contain a single PEM-encoded private key.
|
||||
/// The HCC uses this key to sign JWTs handed out to other components.
|
||||
/// Note that unlike the `public_key` and `public_key_cert_path` args above,
|
||||
/// `private_key_path` must specify a file path, not a directory.
|
||||
#[arg(long)]
|
||||
private_key_path: Option<Utf8PathBuf>,
|
||||
|
||||
/// Token for authenticating this service with the pageservers it controls
|
||||
#[arg(long)]
|
||||
jwt_token: Option<String>,
|
||||
@@ -252,6 +269,7 @@ struct Secrets {
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
control_plane_jwt_token: Option<String>,
|
||||
peer_jwt_token: Option<String>,
|
||||
token_generator: Option<HadronTokenGenerator>,
|
||||
}
|
||||
|
||||
const POSTHOG_CONFIG_ENV: &str = "POSTHOG_CONFIG";
|
||||
@@ -277,7 +295,16 @@ impl Secrets {
|
||||
|
||||
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV) {
|
||||
Some(v) => Some(JwtAuth::from_key(v).context("Loading public key")?),
|
||||
None => None,
|
||||
None => {
|
||||
if let Some(path) = args.public_key_cert_path.as_ref() {
|
||||
Some(
|
||||
JwtAuth::from_cert_path(path)
|
||||
.context("Loading public key from certificates")?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let this = Self {
|
||||
@@ -296,6 +323,10 @@ impl Secrets {
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
),
|
||||
peer_jwt_token: Self::load_secret(&args.peer_jwt_token, Self::PEER_JWT_TOKEN_ENV),
|
||||
token_generator: args
|
||||
.private_key_path
|
||||
.as_ref()
|
||||
.map(|path| HadronTokenGenerator::new(path)),
|
||||
};
|
||||
|
||||
Ok(this)
|
||||
@@ -484,9 +515,14 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url).await);
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
let service = Service::spawn(
|
||||
config,
|
||||
persistence.clone(),
|
||||
secrets.token_generator,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let auth = secrets
|
||||
let jwt_auth = secrets
|
||||
.public_key
|
||||
.map(|jwt_auth| Arc::new(SwappableJwtAuth::new(jwt_auth)));
|
||||
let router = make_router(service.clone(), auth, build_info)
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::hadron_token::HadronTokenGenerator;
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::storage_controller::{
|
||||
@@ -515,6 +516,10 @@ pub struct Service {
|
||||
inner: Arc<std::sync::RwLock<ServiceState>>,
|
||||
config: Config,
|
||||
persistence: Arc<Persistence>,
|
||||
|
||||
// HadronTokenGenerator to generate (sign) JWTs during compute deployment and compute-spec generation.
|
||||
token_generator: Option<HadronTokenGenerator>,
|
||||
|
||||
compute_hook: Arc<ComputeHook>,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
|
||||
@@ -1656,7 +1661,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
|
||||
pub async fn spawn(config: Config, persistence: Arc<Persistence>, token_generator: Option<HadronTokenGenerator>) -> anyhow::Result<Arc<Self>> {
|
||||
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
@@ -1913,6 +1918,7 @@ impl Service {
|
||||
))),
|
||||
config: config.clone(),
|
||||
persistence,
|
||||
token_generator,
|
||||
compute_hook: Arc::new(ComputeHook::new(config.clone())?),
|
||||
result_tx,
|
||||
heartbeater_ps,
|
||||
|
||||
@@ -13,10 +13,11 @@ if TYPE_CHECKING:
|
||||
@dataclass
|
||||
class AuthKeys:
|
||||
priv: str
|
||||
algorithm: str
|
||||
|
||||
def generate_token(self, *, scope: TokenScope, **token_data: Any) -> str:
|
||||
token_data = {key: str(val) for key, val in token_data.items()}
|
||||
token = jwt.encode({"scope": scope, **token_data}, self.priv, algorithm="EdDSA")
|
||||
token = jwt.encode({"scope": scope, **token_data}, self.priv, algorithm=self.algorithm)
|
||||
# cast(Any, self.priv)
|
||||
|
||||
# jwt.encode can return 'bytes' or 'str', depending on Python version or type
|
||||
@@ -46,3 +47,4 @@ class TokenScope(StrEnum):
|
||||
TENANT = "tenant"
|
||||
SCRUBBER = "scrubber"
|
||||
INFRA = "infra"
|
||||
TENANT_ENDPOINT = "tenantendpoint"
|
||||
|
||||
@@ -34,6 +34,9 @@ import pytest
|
||||
import requests
|
||||
import toml
|
||||
from jwcrypto import jwk
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -403,6 +406,15 @@ class PageserverImportConfig:
|
||||
return ("timeline_import_config", value)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HadronTokenDecoder:
|
||||
public_key: str
|
||||
algorithm: str
|
||||
|
||||
def decode_token(self, token: str) -> Dict[str, Any]:
|
||||
return jwt.decode(token, self.public_key, algorithms=[self.algorithm])
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -473,6 +485,7 @@ class NeonEnvBuilder:
|
||||
self.safekeepers_id_start = safekeepers_id_start
|
||||
self.safekeepers_enable_fsync = safekeepers_enable_fsync
|
||||
self.auth_enabled = auth_enabled
|
||||
self.use_hadron_auth_tokens = False
|
||||
self.default_branch_name = default_branch_name
|
||||
self.env: NeonEnv | None = None
|
||||
self.keep_remote_storage_contents: bool = True
|
||||
@@ -1119,8 +1132,12 @@ class NeonEnv:
|
||||
or config.use_https_storage_broker_api
|
||||
)
|
||||
self.ssl_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
|
||||
)
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None)
|
||||
|
||||
# The auth token type used in the test environment. neon_local is instruted to generate key pairs
|
||||
# according to the auth token type. The keys are always generated but are only used if
|
||||
# config.auth_enabled == True.
|
||||
self.auth_token_type: str = "HadronJWT" if config.use_hadron_auth_tokens else "NeonJWT"
|
||||
|
||||
neon_local_env_vars = {}
|
||||
if self.rust_log_override is not None:
|
||||
@@ -1199,6 +1216,7 @@ class NeonEnv:
|
||||
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
|
||||
},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
"auth_token_type": self.auth_token_type,
|
||||
}
|
||||
|
||||
if config.use_https_storage_broker_api:
|
||||
@@ -1246,9 +1264,9 @@ class NeonEnv:
|
||||
)
|
||||
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
http_auth_type = self.auth_token_type if config.auth_enabled else "Trust"
|
||||
pg_auth_type = self.auth_token_type if config.auth_enabled else "Trust"
|
||||
grpc_auth_type = self.auth_token_type if config.auth_enabled else "Trust"
|
||||
for ps_id in range(
|
||||
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
|
||||
):
|
||||
@@ -1386,9 +1404,8 @@ class NeonEnv:
|
||||
"https_port": port.https,
|
||||
"sync": config.safekeepers_enable_fsync,
|
||||
"use_https_safekeeper_api": config.use_https_safekeeper_api,
|
||||
"auth_type": self.auth_token_type if config.auth_enabled else "Trust",
|
||||
}
|
||||
if config.auth_enabled:
|
||||
sk_cfg["auth_enabled"] = True
|
||||
if self.safekeepers_remote_storage is not None:
|
||||
sk_cfg["remote_storage"] = (
|
||||
self.safekeepers_remote_storage.to_toml_inline_table().strip()
|
||||
@@ -1568,29 +1585,66 @@ class NeonEnv:
|
||||
@cached_property
|
||||
def auth_keys(self) -> AuthKeys:
|
||||
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
|
||||
return AuthKeys(priv=priv)
|
||||
algorithm = "EdDSA" if self.auth_token_type == "NeonJWT" else "RS256"
|
||||
return AuthKeys(priv=priv, algorithm=algorithm)
|
||||
|
||||
@cached_property
|
||||
def hadron_token_decoder(self) -> HadronTokenDecoder:
|
||||
cert = (Path(self.repo_dir) / "auth_public_key.pem").read_text()
|
||||
x509_cert = x509.load_pem_x509_certificate(cert.encode(), default_backend())
|
||||
pem_public_key = (
|
||||
x509_cert.public_key()
|
||||
.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
.decode()
|
||||
)
|
||||
return HadronTokenDecoder(public_key=pem_public_key, algorithm="RS256")
|
||||
|
||||
def regenerate_keys_at(self, privkey_path: Path, pubkey_path: Path):
|
||||
# compare generate_auth_keys() in local_env.rs
|
||||
subprocess.run(
|
||||
["openssl", "genpkey", "-algorithm", "ed25519", "-out", privkey_path],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
if self.auth_token_type == "NeonJWT":
|
||||
# compare generate_auth_keys() in local_env.rs
|
||||
subprocess.run(
|
||||
["openssl", "genpkey", "-algorithm", "ed25519", "-out", privkey_path],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"pkey",
|
||||
"-in",
|
||||
privkey_path,
|
||||
"-pubout",
|
||||
"-out",
|
||||
pubkey_path,
|
||||
],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"pkey",
|
||||
"-in",
|
||||
privkey_path,
|
||||
"-pubout",
|
||||
"-out",
|
||||
pubkey_path,
|
||||
],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
elif self.auth_token_type == "HadronJWT":
|
||||
# compare generate_auth_keys() in local_env.rs
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"req",
|
||||
"-x509",
|
||||
"-newkey",
|
||||
"rsa:4096",
|
||||
"-sha256",
|
||||
"-keyout",
|
||||
privkey_path,
|
||||
"-out",
|
||||
pubkey_path,
|
||||
"-nodes",
|
||||
"-subj",
|
||||
"/CN=eng-brickstore@databricks.com",
|
||||
],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
del self.auth_keys
|
||||
|
||||
def generate_endpoint_id(self) -> str:
|
||||
@@ -2008,10 +2062,10 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
|
||||
return resp
|
||||
|
||||
def headers(self, scope: TokenScope | None) -> dict[str, str]:
|
||||
def headers(self, scope: TokenScope | None, **token_data: Any) -> dict[str, str]:
|
||||
headers = {}
|
||||
if self.auth_enabled and scope is not None:
|
||||
jwt_token = self.env.auth_keys.generate_token(scope=scope)
|
||||
jwt_token = self.env.auth_keys.generate_token(scope=scope, **token_data)
|
||||
headers["Authorization"] = f"Bearer {jwt_token}"
|
||||
|
||||
return headers
|
||||
|
||||
@@ -1403,6 +1403,12 @@ def test_storage_controller_s3_time_travel_recovery(
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="""
|
||||
[BRC-1269, BRC-1270] Hadron currently uses network segmentation to prevent all storage controller (non-HCC) HTTP APIs from being
|
||||
accessed from untrusted networks, so auth is currently permenantly disabled for all of these APIs in storage controller code.
|
||||
"""
|
||||
)
|
||||
def test_storage_controller_auth(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
Reference in New Issue
Block a user