From 8f627ea0abbe8079052061bd25f9cec321a775bd Mon Sep 17 00:00:00 2001 From: HaoyuHuang Date: Thu, 17 Jul 2025 16:17:01 -0700 Subject: [PATCH] A few more SC changes (#12649) ## Problem ## Summary of changes --- pageserver/client/src/mgmt_api.rs | 16 +++++++ .../down.sql | 2 + .../up.sql | 17 +++++++ storage_controller/src/hadron_utils.rs | 44 +++++++++++++++++ storage_controller/src/lib.rs | 1 + storage_controller/src/pageserver_client.rs | 48 +++++++++++++++++++ storage_controller/src/reconciler.rs | 13 +++-- storage_controller/src/schema.rs | 20 ++++++++ storage_controller/src/tenant_shard.rs | 8 +++- 9 files changed, 163 insertions(+), 6 deletions(-) create mode 100644 storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/down.sql create mode 100644 storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/up.sql create mode 100644 storage_controller/src/hadron_utils.rs diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index fe1ddc2e7d..3867e536f4 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -873,6 +873,22 @@ impl Client { .map_err(Error::ReceiveBody) } + pub async fn reset_alert_gauges(&self) -> Result<()> { + let uri = format!( + "{}/hadron-internal/reset_alert_gauges", + self.mgmt_api_endpoint + ); + self.start_request(Method::POST, uri) + .send() + .await + .map_err(Error::SendRequest)? + .error_from_body() + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + pub async fn wait_lsn( &self, tenant_shard_id: TenantShardId, diff --git a/storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/down.sql b/storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/down.sql new file mode 100644 index 0000000000..b45b45e438 --- /dev/null +++ b/storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/down.sql @@ -0,0 +1,2 @@ +DROP TABLE hadron_safekeepers; +DROP TABLE hadron_timeline_safekeepers; diff --git a/storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/up.sql b/storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/up.sql new file mode 100644 index 0000000000..6cee981efc --- /dev/null +++ b/storage_controller/migrations/2025-07-17-000001_hadron_safekeepers/up.sql @@ -0,0 +1,17 @@ +-- hadron_safekeepers keep track of all Safe Keeper nodes that exist in the system. +-- Upon startup, each Safe Keeper reaches out to the hadron cluster coordinator to register its node ID and listen addresses. + +CREATE TABLE hadron_safekeepers ( + sk_node_id BIGINT PRIMARY KEY NOT NULL, + listen_http_addr VARCHAR NOT NULL, + listen_http_port INTEGER NOT NULL, + listen_pg_addr VARCHAR NOT NULL, + listen_pg_port INTEGER NOT NULL +); + +CREATE TABLE hadron_timeline_safekeepers ( + timeline_id VARCHAR NOT NULL, + sk_node_id BIGINT NOT NULL, + legacy_endpoint_id UUID DEFAULT NULL, + PRIMARY KEY(timeline_id, sk_node_id) +); diff --git a/storage_controller/src/hadron_utils.rs b/storage_controller/src/hadron_utils.rs new file mode 100644 index 0000000000..871e21c367 --- /dev/null +++ b/storage_controller/src/hadron_utils.rs @@ -0,0 +1,44 @@ +use std::collections::BTreeMap; + +use rand::Rng; +use utils::shard::TenantShardId; + +static CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()"; + +/// Generate a random string of `length` that can be used as a password. The generated string +/// contains alphanumeric characters and special characters (!@#$%^&*()) +pub fn generate_random_password(length: usize) -> String { + let mut rng = rand::thread_rng(); + (0..length) + .map(|_| { + let idx = rng.gen_range(0..CHARSET.len()); + CHARSET[idx] as char + }) + .collect() +} + +pub(crate) struct TenantShardSizeMap { + #[expect(dead_code)] + pub map: BTreeMap, +} + +impl TenantShardSizeMap { + pub fn new(map: BTreeMap) -> Self { + Self { map } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_generate_random_password() { + let pwd1 = generate_random_password(10); + assert_eq!(pwd1.len(), 10); + let pwd2 = generate_random_password(10); + assert_ne!(pwd1, pwd2); + assert!(pwd1.chars().all(|c| CHARSET.contains(&(c as u8)))); + assert!(pwd2.chars().all(|c| CHARSET.contains(&(c as u8)))); + } +} diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 36e3c5dc6c..24b06da83a 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -6,6 +6,7 @@ extern crate hyper0 as hyper; mod auth; mod background_node_operations; mod compute_hook; +pub mod hadron_utils; mod heartbeater; pub mod http; mod id_lock_map; diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index da0687895a..9e829e252d 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -14,6 +14,8 @@ use reqwest::StatusCode; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; +use crate::hadron_utils::TenantShardSizeMap; + /// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage /// controller to collect metrics in a non-intrusive manner. #[derive(Debug, Clone)] @@ -86,6 +88,31 @@ impl PageserverClient { ) } + #[expect(dead_code)] + pub(crate) async fn tenant_timeline_compact( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + force_image_layer_creation: bool, + wait_until_done: bool, + ) -> Result<()> { + measured_request!( + "tenant_timeline_compact", + crate::metrics::Method::Put, + &self.node_id_label, + self.inner + .tenant_timeline_compact( + tenant_shard_id, + timeline_id, + force_image_layer_creation, + true, + false, + wait_until_done, + ) + .await + ) + } + /* BEGIN_HADRON */ pub(crate) async fn tenant_timeline_describe( &self, @@ -101,6 +128,17 @@ impl PageserverClient { .await ) } + + #[expect(dead_code)] + pub(crate) async fn list_tenant_visible_size(&self) -> Result { + measured_request!( + "list_tenant_visible_size", + crate::metrics::Method::Get, + &self.node_id_label, + self.inner.list_tenant_visible_size().await + ) + .map(TenantShardSizeMap::new) + } /* END_HADRON */ pub(crate) async fn tenant_scan_remote_storage( @@ -365,6 +403,16 @@ impl PageserverClient { ) } + #[expect(dead_code)] + pub(crate) async fn reset_alert_gauges(&self) -> Result<()> { + measured_request!( + "reset_alert_gauges", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.reset_alert_gauges().await + ) + } + pub(crate) async fn wait_lsn( &self, tenant_shard_id: TenantShardId, diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index a2fba0fa56..d1590ec75e 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -862,11 +862,11 @@ impl Reconciler { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { if refreshed { tracing::info!( - node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute."); + node_id=%node.get_id(), "[Attached] Observed configuration correct after refresh. Notifying compute."); self.compute_notify().await?; } else { // Nothing to do - tracing::info!(node_id=%node.get_id(), "Observed configuration already correct."); + tracing::info!(node_id=%node.get_id(), "[Attached] Observed configuration already correct."); } } observed => { @@ -945,17 +945,17 @@ impl Reconciler { match self.observed.locations.get(&node.get_id()) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { // Nothing to do - tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.") + tracing::info!(node_id=%node.get_id(), "[Secondary] Observed configuration already correct.") } _ => { // Only try and configure secondary locations on nodes that are available. This // allows the reconciler to "succeed" while some secondaries are offline (e.g. after // a node failure, where the failed node will have a secondary intent) if node.is_available() { - tracing::info!(node_id=%node.get_id(), "Observed configuration requires update."); + tracing::info!(node_id=%node.get_id(), "[Secondary] Observed configuration requires update."); changes.push((node.clone(), wanted_conf)) } else { - tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable"); + tracing::info!(node_id=%node.get_id(), "[Secondary] Skipping configuration as secondary, node is unavailable"); self.observed .locations .insert(node.get_id(), ObservedStateLocation { conf: None }); @@ -1066,6 +1066,9 @@ impl Reconciler { } result } else { + tracing::info!( + "Compute notification is skipped because the tenant shard does not have an attached (primary) location" + ); Ok(()) } } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 312f7e0b0e..f3dcdaf798 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -13,6 +13,24 @@ diesel::table! { } } +diesel::table! { + hadron_safekeepers (sk_node_id) { + sk_node_id -> Int8, + listen_http_addr -> Varchar, + listen_http_port -> Int4, + listen_pg_addr -> Varchar, + listen_pg_port -> Int4, + } +} + +diesel::table! { + hadron_timeline_safekeepers (timeline_id, sk_node_id) { + timeline_id -> Varchar, + sk_node_id -> Int8, + legacy_endpoint_id -> Nullable, + } +} + diesel::table! { metadata_health (tenant_id, shard_number, shard_count) { tenant_id -> Varchar, @@ -105,6 +123,8 @@ diesel::table! { diesel::allow_tables_to_appear_in_same_query!( controllers, + hadron_safekeepers, + hadron_timeline_safekeepers, metadata_health, nodes, safekeeper_timeline_pending_ops, diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 05de155963..f60378470e 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1611,7 +1611,13 @@ impl TenantShard { // Update result counter let outcome_label = match &result { - Ok(_) => ReconcileOutcome::Success, + Ok(_) => { + if reconciler.compute_notify_failure { + ReconcileOutcome::SuccessNoNotify + } else { + ReconcileOutcome::Success + } + } Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel, Err(_) => ReconcileOutcome::Error, };