diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 6ee1044c18..617b2cd1ba 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -1035,7 +1035,15 @@ async fn main() -> anyhow::Result<()> { resp.sort_by(|a, b| a.id.cmp(&b.id)); let mut table = comfy_table::Table::new(); - table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]); + table.set_header([ + "Id", + "Version", + "Host", + "Port", + "Http Port", + "AZ Id", + "Scheduling", + ]); for sk in resp { table.add_row([ format!("{}", sk.id), @@ -1043,7 +1051,8 @@ async fn main() -> anyhow::Result<()> { sk.host, format!("{}", sk.port), format!("{}", sk.http_port), - sk.availability_zone_id.to_string(), + sk.availability_zone_id.clone(), + String::from(sk.scheduling_policy), ]); } println!("{table}"); diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index faf11e487c..7eb3547183 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -320,6 +320,38 @@ impl From for String { } } +#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] +pub enum SkSchedulingPolicy { + Active, + Disabled, + Decomissioned, +} + +impl FromStr for SkSchedulingPolicy { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + Ok(match s { + "active" => Self::Active, + "disabled" => Self::Disabled, + "decomissioned" => Self::Decomissioned, + _ => return Err(anyhow::anyhow!("Unknown scheduling state '{s}'")), + }) + } +} + +impl From for String { + fn from(value: SkSchedulingPolicy) -> String { + use SkSchedulingPolicy::*; + match value { + Active => "active", + Disabled => "disabled", + Decomissioned => "decomissioned", + } + .to_string() + } +} + /// Controls how tenant shards are mapped to locations on pageservers, e.g. whether /// to create secondary locations. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] @@ -387,6 +419,7 @@ pub struct SafekeeperDescribeResponse { pub port: i32, pub http_port: i32, pub availability_zone_id: String, + pub scheduling_policy: SkSchedulingPolicy, } #[cfg(test)] diff --git a/storage_controller/migrations/2024-12-12-212515_safekeepers_scheduling_policy/down.sql b/storage_controller/migrations/2024-12-12-212515_safekeepers_scheduling_policy/down.sql new file mode 100644 index 0000000000..e26bff798f --- /dev/null +++ b/storage_controller/migrations/2024-12-12-212515_safekeepers_scheduling_policy/down.sql @@ -0,0 +1 @@ +ALTER TABLE safekeepers DROP scheduling_policy; diff --git a/storage_controller/migrations/2024-12-12-212515_safekeepers_scheduling_policy/up.sql b/storage_controller/migrations/2024-12-12-212515_safekeepers_scheduling_policy/up.sql new file mode 100644 index 0000000000..d83cc6cc46 --- /dev/null +++ b/storage_controller/migrations/2024-12-12-212515_safekeepers_scheduling_policy/up.sql @@ -0,0 +1 @@ +ALTER TABLE safekeepers ADD scheduling_policy VARCHAR NOT NULL DEFAULT 'disabled'; diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 24fd4c341a..5385e4ee0b 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -3,7 +3,7 @@ use crate::metrics::{ HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup, METRICS_REGISTRY, }; -use crate::persistence::SafekeeperPersistence; +use crate::persistence::SafekeeperUpsert; use crate::reconciler::ReconcileError; use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT}; use anyhow::Context; @@ -1249,7 +1249,7 @@ async fn handle_get_safekeeper(req: Request) -> Result, Api async fn handle_upsert_safekeeper(mut req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Infra)?; - let body = json_request::(&mut req).await?; + let body = json_request::(&mut req).await?; let id = parse_request_param::(&req, "id")?; if id != body.id { diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index c5eb106f24..cebf3e9594 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -13,6 +13,7 @@ use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; use pageserver_api::controller_api::SafekeeperDescribeResponse; use pageserver_api::controller_api::ShardSchedulingPolicy; +use pageserver_api::controller_api::SkSchedulingPolicy; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; use pageserver_api::models::TenantConfig; use pageserver_api::shard::ShardConfigError; @@ -1075,12 +1076,14 @@ impl Persistence { pub(crate) async fn safekeeper_upsert( &self, - record: SafekeeperPersistence, + record: SafekeeperUpsert, ) -> Result<(), DatabaseError> { use crate::schema::safekeepers::dsl::*; self.with_conn(move |conn| -> DatabaseResult<()> { - let bind = record.as_insert_or_update(); + let bind = record + .as_insert_or_update() + .map_err(|e| DatabaseError::Logical(format!("{e}")))?; let inserted_updated = diesel::insert_into(safekeepers) .values(&bind) @@ -1243,6 +1246,7 @@ pub(crate) struct ControllerPersistence { pub(crate) started_at: chrono::DateTime, } +// What we store in the database #[derive(Serialize, Deserialize, Queryable, Selectable, Eq, PartialEq, Debug, Clone)] #[diesel(table_name = crate::schema::safekeepers)] pub(crate) struct SafekeeperPersistence { @@ -1257,11 +1261,51 @@ pub(crate) struct SafekeeperPersistence { pub(crate) active: bool, pub(crate) http_port: i32, pub(crate) availability_zone_id: String, + pub(crate) scheduling_policy: String, } impl SafekeeperPersistence { - fn as_insert_or_update(&self) -> InsertUpdateSafekeeper<'_> { - InsertUpdateSafekeeper { + pub(crate) fn as_describe_response(&self) -> Result { + let scheduling_policy = + SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| { + DatabaseError::Logical(format!("can't construct SkSchedulingPolicy: {e:?}")) + })?; + // omit the `active` flag on purpose: it is deprecated. + Ok(SafekeeperDescribeResponse { + id: NodeId(self.id as u64), + region_id: self.region_id.clone(), + version: self.version, + host: self.host.clone(), + port: self.port, + http_port: self.http_port, + availability_zone_id: self.availability_zone_id.clone(), + scheduling_policy, + }) + } +} + +/// What we expect from the upsert http api +#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)] +pub(crate) struct SafekeeperUpsert { + pub(crate) id: i64, + pub(crate) region_id: String, + /// 1 is special, it means just created (not currently posted to storcon). + /// Zero or negative is not really expected. + /// Otherwise the number from `release-$(number_of_commits_on_branch)` tag. + pub(crate) version: i64, + pub(crate) host: String, + pub(crate) port: i32, + pub(crate) active: bool, + pub(crate) http_port: i32, + pub(crate) availability_zone_id: String, +} + +impl SafekeeperUpsert { + fn as_insert_or_update(&self) -> anyhow::Result> { + if self.version < 0 { + anyhow::bail!("negative version: {}", self.version); + } + Ok(InsertUpdateSafekeeper { id: self.id, region_id: &self.region_id, version: self.version, @@ -1270,19 +1314,9 @@ impl SafekeeperPersistence { active: self.active, http_port: self.http_port, availability_zone_id: &self.availability_zone_id, - } - } - pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse { - // omit the `active` flag on purpose: it is deprecated. - SafekeeperDescribeResponse { - id: NodeId(self.id as u64), - region_id: self.region_id.clone(), - version: self.version, - host: self.host.clone(), - port: self.port, - http_port: self.http_port, - availability_zone_id: self.availability_zone_id.clone(), - } + // None means a wish to not update this column. We expose abilities to update it via other means. + scheduling_policy: None, + }) } } @@ -1297,4 +1331,5 @@ struct InsertUpdateSafekeeper<'a> { active: bool, http_port: i32, availability_zone_id: &'a str, + scheduling_policy: Option<&'a str>, } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 9e005ab932..44c91619ab 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -39,6 +39,7 @@ diesel::table! { active -> Bool, http_port -> Int4, availability_zone_id -> Text, + scheduling_policy -> Varchar, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index cf2d1ef97b..265b2798d2 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -7350,13 +7350,12 @@ impl Service { pub(crate) async fn safekeepers_list( &self, ) -> Result, DatabaseError> { - Ok(self - .persistence + self.persistence .list_safekeepers() .await? .into_iter() .map(|v| v.as_describe_response()) - .collect::>()) + .collect::, _>>() } pub(crate) async fn get_safekeeper( @@ -7366,12 +7365,12 @@ impl Service { self.persistence .safekeeper_get(id) .await - .map(|v| v.as_describe_response()) + .and_then(|v| v.as_describe_response()) } pub(crate) async fn upsert_safekeeper( &self, - record: crate::persistence::SafekeeperPersistence, + record: crate::persistence::SafekeeperUpsert, ) -> Result<(), DatabaseError> { self.persistence.safekeeper_upsert(record).await } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 207f55a214..da6d5b8622 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3019,7 +3019,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool: compared = [dict(a), dict(b)] - masked_keys = ["created_at", "updated_at", "active"] + masked_keys = ["created_at", "updated_at", "active", "scheduling_policy"] for d in compared: # keep deleting these in case we are comparing the body as it will be uploaded by real scripts