diff --git a/Cargo.lock b/Cargo.lock index e5f39658a7..215b3360bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1872,6 +1872,7 @@ dependencies = [ "diesel_derives", "itoa", "serde_json", + "uuid", ] [[package]] @@ -6933,6 +6934,7 @@ dependencies = [ "tokio-util", "tracing", "utils", + "uuid", "workspace_hack", ] @@ -8206,6 +8208,7 @@ dependencies = [ "tracing-error", "tracing-subscriber", "tracing-utils", + "uuid", "walkdir", ] @@ -8807,7 +8810,6 @@ dependencies = [ "tracing-log", "tracing-subscriber", "url", - "uuid", "zeroize", "zstd", "zstd-safe", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 7b1dc56071..4b326949d7 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -47,6 +47,7 @@ tracing-subscriber = { workspace = true, features = ["json", "registry"] } tracing-utils.workspace = true rand.workspace = true scopeguard.workspace = true +uuid.workspace = true strum.workspace = true strum_macros.workspace = true walkdir.workspace = true diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index de3a964d23..b2aade15de 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -12,7 +12,8 @@ use jsonwebtoken::{ Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode, }; use pem::Pem; -use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use serde::{Deserialize, Deserializer, Serialize, de::DeserializeOwned}; +use uuid::Uuid; use crate::id::TenantId; @@ -25,6 +26,11 @@ pub enum Scope { /// Provides access to all data for a specific tenant (specified in `struct Claims` below) // TODO: join these two? Tenant, + /// Provides access to all data for a specific tenant, but based on endpoint ID. This token scope + /// is only used by compute to fetch the spec for a specific endpoint. The spec contains a Tenant-scoped + /// token authorizing access to all data of a tenant, so the spec-fetch API requires a TenantEndpoint + /// scope token to ensure that untrusted compute nodes can't fetch spec for arbitrary endpoints. + TenantEndpoint, /// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs. /// Should only be used e.g. for status check/tenant creation/list. PageServerApi, @@ -51,17 +57,43 @@ pub enum Scope { ControllerPeer, } +fn deserialize_empty_string_as_none_uuid<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let opt = Option::::deserialize(deserializer)?; + match opt.as_deref() { + Some("") => Ok(None), + Some(s) => Uuid::parse_str(s) + .map(Some) + .map_err(serde::de::Error::custom), + None => Ok(None), + } +} + /// JWT payload. See docs/authentication.md for the format #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Claims { #[serde(default)] pub tenant_id: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + // Neon control plane includes this field as empty in the claims. + // Consider it None in those cases. + deserialize_with = "deserialize_empty_string_as_none_uuid" + )] + pub endpoint_id: Option, pub scope: Scope, } impl Claims { pub fn new(tenant_id: Option, scope: Scope) -> Self { - Self { tenant_id, scope } + Self { + tenant_id, + scope, + endpoint_id: None, + } } } @@ -212,6 +244,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH let expected_claims = Claims { tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()), scope: Scope::Tenant, + endpoint_id: None, }; // A test token containing the following payload, signed using TEST_PRIV_KEY_ED25519: @@ -240,6 +273,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH let claims = Claims { tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()), scope: Scope::Tenant, + endpoint_id: None, }; let pem = pem::parse(TEST_PRIV_KEY_ED25519).unwrap(); diff --git a/pageserver/src/auth.rs b/pageserver/src/auth.rs index 4075427ab4..9e97fdaba8 100644 --- a/pageserver/src/auth.rs +++ b/pageserver/src/auth.rs @@ -20,7 +20,8 @@ pub fn check_permission(claims: &Claims, tenant_id: Option) -> Result< | Scope::GenerationsApi | Scope::Infra | Scope::Scrubber - | Scope::ControllerPeer, + | Scope::ControllerPeer + | Scope::TenantEndpoint, _, ) => Err(AuthError( format!( diff --git a/safekeeper/src/auth.rs b/safekeeper/src/auth.rs index 81c79fae30..008f903a89 100644 --- a/safekeeper/src/auth.rs +++ b/safekeeper/src/auth.rs @@ -21,7 +21,8 @@ pub fn check_permission(claims: &Claims, tenant_id: Option) -> Result< | Scope::GenerationsApi | Scope::Infra | Scope::Scrubber - | Scope::ControllerPeer, + | Scope::ControllerPeer + | Scope::TenantEndpoint, _, ) => Err(AuthError( format!( diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 143f4241f4..d67be6d469 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -52,6 +52,7 @@ tokio-rustls.workspace = true tokio-util.workspace = true tokio.workspace = true tracing.workspace = true +uuid.workspace = true measured.workspace = true rustls.workspace = true scopeguard.workspace = true @@ -63,6 +64,7 @@ tokio-postgres-rustls.workspace = true diesel = { version = "2.2.6", features = [ "serde_json", "chrono", + "uuid", ] } diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] } diesel_migrations = { version = "2.2.0" } diff --git a/storage_controller/src/auth.rs b/storage_controller/src/auth.rs index ef47abf8c7..8f15f0f072 100644 --- a/storage_controller/src/auth.rs +++ b/storage_controller/src/auth.rs @@ -1,4 +1,5 @@ use utils::auth::{AuthError, Claims, Scope}; +use uuid::Uuid; pub fn check_permission(claims: &Claims, required_scope: Scope) -> Result<(), AuthError> { if claims.scope != required_scope { @@ -7,3 +8,14 @@ pub fn check_permission(claims: &Claims, required_scope: Scope) -> Result<(), Au Ok(()) } + +#[allow(dead_code)] +pub fn check_endpoint_permission(claims: &Claims, endpoint_id: Uuid) -> Result<(), AuthError> { + if claims.scope != Scope::TenantEndpoint { + return Err(AuthError("Scope mismatch. Permission denied".into())); + } + if claims.endpoint_id != Some(endpoint_id) { + return Err(AuthError("Endpoint id mismatch. Permission denied".into())); + } + Ok(()) +} diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index ab37a207e4..fb03412f3c 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -810,6 +810,7 @@ impl ComputeHook { let send_locked = tokio::select! { guard = send_lock.lock_owned() => {guard}, _ = cancel.cancelled() => { + tracing::info!("Notification cancelled while waiting for lock"); return Err(NotifyError::ShuttingDown) } }; @@ -851,11 +852,32 @@ impl ComputeHook { let notify_url = compute_hook_url.as_ref().unwrap(); self.do_notify(notify_url, &request, cancel).await } else { - self.do_notify_local::(&request).await.map_err(|e| { + match self.do_notify_local::(&request).await.map_err(|e| { // This path is for testing only, so munge the error into our prod-style error type. - tracing::error!("neon_local notification hook failed: {e}"); - NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) - }) + if e.to_string().contains("refresh-configuration-pending") { + // If the error message mentions "refresh-configuration-pending", it means the compute node + // rejected our notification request because it already trying to reconfigure itself. We + // can proceed with the rest of the reconcliation process as the compute node already + // discovers the need to reconfigure and will eventually update its configuration once + // we update the pageserver mappings. In fact, it is important that we continue with + // reconcliation to make sure we update the pageserver mappings to unblock the compute node. + tracing::info!("neon_local notification hook failed: {e}"); + tracing::info!("Notification failed likely due to compute node self-reconfiguration, will retry."); + Ok(()) + } else { + tracing::error!("neon_local notification hook failed: {e}"); + Err(NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)) + } + }) { + // Compute node accepted the notification request. Ok to proceed. + Ok(_) => Ok(()), + // Compute node rejected our request but it is already self-reconfiguring. Ok to proceed. + Err(Ok(_)) => Ok(()), + // Fail the reconciliation attempt in all other cases. Recall that this whole code path involving + // neon_local is for testing only. In production we always retry failed reconcliations so we + // don't have any deadends here. + Err(Err(e)) => Err(e), + } }; match result { diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index 8738386968..0c923e742e 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -151,6 +151,29 @@ pub(crate) struct StorageControllerMetricGroup { /// Indicator of completed safekeeper reconciles, broken down by safekeeper. pub(crate) storage_controller_safekeeper_reconciles_complete: measured::CounterVec, + + /* BEGIN HADRON */ + /// Hadron `config_watcher` reconciliation runs completed, broken down by success/failure. + pub(crate) storage_controller_config_watcher_complete: + measured::CounterVec, + + /// Hadron long waits for node state changes during drain and fill. + pub(crate) storage_controller_drain_and_fill_long_waits: measured::Counter, + + /// Set to 1 if we detect any page server pods with pending node pool rotation annotations. + /// Requires manual reset after oncall investigation. + pub(crate) storage_controller_ps_node_pool_rotation_pending: measured::Gauge, + + /// Hadron storage scrubber status. + pub(crate) storage_controller_storage_scrub_status: + measured::CounterVec, + + /// Desired number of pageservers managed by the storage controller + pub(crate) storage_controller_num_pageservers_desired: measured::Gauge, + + /// Desired number of safekeepers managed by the storage controller + pub(crate) storage_controller_num_safekeeper_desired: measured::Gauge, + /* END HADRON */ } impl StorageControllerMetrics { @@ -173,6 +196,10 @@ impl Default for StorageControllerMetrics { .storage_controller_reconcile_complete .init_all_dense(); + metrics_group + .storage_controller_config_watcher_complete + .init_all_dense(); + Self { metrics_group, encoder: Mutex::new(measured::text::BufferedTextEncoder::new()), @@ -262,11 +289,48 @@ pub(crate) struct ReconcileLongRunningLabelGroup<'a> { pub(crate) sequence: &'a str, } +#[derive(measured::LabelGroup, Clone)] +#[label(set = StorageScrubberLabelGroupSet)] +pub(crate) struct StorageScrubberLabelGroup<'a> { + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + pub(crate) tenant_id: &'a str, + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + pub(crate) shard_number: &'a str, + #[label(dynamic_with = lasso::ThreadedRodeo, default)] + pub(crate) timeline_id: &'a str, + pub(crate) outcome: StorageScrubberOutcome, +} + +#[derive(FixedCardinalityLabel, Clone, Copy)] +pub(crate) enum StorageScrubberOutcome { + PSOk, + PSWarning, + PSError, + PSOrphan, + SKOk, + SKError, +} + +#[derive(measured::LabelGroup)] +#[label(set = ConfigWatcherCompleteLabelGroupSet)] +pub(crate) struct ConfigWatcherCompleteLabelGroup { + // Reuse the ReconcileOutcome from the SC's reconciliation metrics. + pub(crate) status: ReconcileOutcome, +} + #[derive(FixedCardinalityLabel, Clone, Copy)] pub(crate) enum ReconcileOutcome { + // Successfully reconciled everything. #[label(rename = "ok")] Success, + // Used by tenant-shard reconciler only. Reconciled pageserver state successfully, + // but failed to delivery the compute notificiation. This error is typically transient + // but if its occurance keeps increasing, it should be investigated. + #[label(rename = "ok_no_notify")] + SuccessNoNotify, + // We failed to reconcile some state and the reconcilation will be retried. Error, + // Reconciliation was cancelled. Cancel, } diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 6642c72f3c..63c82b5682 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -51,6 +51,39 @@ pub(crate) struct Node { cancel: CancellationToken, } +#[allow(dead_code)] +const ONE_MILLION: i64 = 1000000; + +// Converts a pool ID to a large number that can be used to assign unique IDs to pods in StatefulSets. +/// For example, if pool_id is 1, then the pods have NodeIds 1000000, 1000001, 1000002, etc. +/// If pool_id is None, then the pods have NodeIds 0, 1, 2, etc. +#[allow(dead_code)] +pub fn transform_pool_id(pool_id: Option) -> i64 { + match pool_id { + Some(id) => (id as i64) * ONE_MILLION, + None => 0, + } +} + +#[allow(dead_code)] +pub fn get_pool_id_from_node_id(node_id: i64) -> i32 { + (node_id / ONE_MILLION) as i32 +} + +/// Example pod name: page-server-0-1, safe-keeper-1-0 +#[allow(dead_code)] +pub fn get_node_id_from_pod_name(pod_name: &str) -> anyhow::Result { + let parts: Vec<&str> = pod_name.split('-').collect(); + if parts.len() != 4 { + return Err(anyhow::anyhow!("Invalid pod name: {}", pod_name)); + } + let pool_id = parts[2].parse::()?; + let node_offset = parts[3].parse::()?; + let node_id = transform_pool_id(Some(pool_id)) + node_offset; + + Ok(NodeId(node_id as u64)) +} + /// When updating [`Node::availability`] we use this type to indicate to the caller /// whether/how they changed it. pub(crate) enum AvailabilityTransition { @@ -403,3 +436,25 @@ impl std::fmt::Debug for Node { write!(f, "{} ({})", self.id, self.listen_http_addr) } } + +#[cfg(test)] +mod tests { + use utils::id::NodeId; + + use crate::node::get_node_id_from_pod_name; + + #[test] + fn test_get_node_id_from_pod_name() { + let pod_name = "page-server-3-12"; + let node_id = get_node_id_from_pod_name(pod_name).unwrap(); + assert_eq!(node_id, NodeId(3000012)); + + let pod_name = "safe-keeper-1-0"; + let node_id = get_node_id_from_pod_name(pod_name).unwrap(); + assert_eq!(node_id, NodeId(1000000)); + + let pod_name = "invalid-pod-name"; + let result = get_node_id_from_pod_name(pod_name); + assert!(result.is_err()); + } +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index c61598cdf6..d6d64a2045 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -107,7 +107,6 @@ tracing-core = { version = "0.1" } tracing-log = { version = "0.2" } tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } url = { version = "2", features = ["serde"] } -uuid = { version = "1", features = ["serde", "v4", "v7"] } zeroize = { version = "1", features = ["derive", "serde"] } zstd = { version = "0.13" } zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }