Add scheduling_policy column to safekeepers table (#10205)

Add a `scheduling_policy` column to the safekeepers table of the storage
controller.

Part of #9981
This commit is contained in:
Arpad Müller
2025-01-09 16:55:02 +01:00
committed by GitHub
parent ad51622568
commit bebc46e713
9 changed files with 106 additions and 27 deletions

View File

@@ -1035,7 +1035,15 @@ async fn main() -> anyhow::Result<()> {
resp.sort_by(|a, b| a.id.cmp(&b.id));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
table.set_header([
"Id",
"Version",
"Host",
"Port",
"Http Port",
"AZ Id",
"Scheduling",
]);
for sk in resp {
table.add_row([
format!("{}", sk.id),
@@ -1043,7 +1051,8 @@ async fn main() -> anyhow::Result<()> {
sk.host,
format!("{}", sk.port),
format!("{}", sk.http_port),
sk.availability_zone_id.to_string(),
sk.availability_zone_id.clone(),
String::from(sk.scheduling_policy),
]);
}
println!("{table}");

View File

@@ -320,6 +320,38 @@ impl From<NodeSchedulingPolicy> for String {
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum SkSchedulingPolicy {
Active,
Disabled,
Decomissioned,
}
impl FromStr for SkSchedulingPolicy {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"active" => Self::Active,
"disabled" => Self::Disabled,
"decomissioned" => Self::Decomissioned,
_ => return Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
})
}
}
impl From<SkSchedulingPolicy> for String {
fn from(value: SkSchedulingPolicy) -> String {
use SkSchedulingPolicy::*;
match value {
Active => "active",
Disabled => "disabled",
Decomissioned => "decomissioned",
}
.to_string()
}
}
/// Controls how tenant shards are mapped to locations on pageservers, e.g. whether
/// to create secondary locations.
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
@@ -387,6 +419,7 @@ pub struct SafekeeperDescribeResponse {
pub port: i32,
pub http_port: i32,
pub availability_zone_id: String,
pub scheduling_policy: SkSchedulingPolicy,
}
#[cfg(test)]

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers DROP scheduling_policy;

View File

@@ -0,0 +1 @@
ALTER TABLE safekeepers ADD scheduling_policy VARCHAR NOT NULL DEFAULT 'disabled';

View File

@@ -3,7 +3,7 @@ use crate::metrics::{
HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
METRICS_REGISTRY,
};
use crate::persistence::SafekeeperPersistence;
use crate::persistence::SafekeeperUpsert;
use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
@@ -1249,7 +1249,7 @@ async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, Api
async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Infra)?;
let body = json_request::<SafekeeperPersistence>(&mut req).await?;
let body = json_request::<SafekeeperUpsert>(&mut req).await?;
let id = parse_request_param::<i64>(&req, "id")?;
if id != body.id {

View File

@@ -13,6 +13,7 @@ use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::SafekeeperDescribeResponse;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::SkSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
@@ -1075,12 +1076,14 @@ impl Persistence {
pub(crate) async fn safekeeper_upsert(
&self,
record: SafekeeperPersistence,
record: SafekeeperUpsert,
) -> Result<(), DatabaseError> {
use crate::schema::safekeepers::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
let bind = record.as_insert_or_update();
let bind = record
.as_insert_or_update()
.map_err(|e| DatabaseError::Logical(format!("{e}")))?;
let inserted_updated = diesel::insert_into(safekeepers)
.values(&bind)
@@ -1243,6 +1246,7 @@ pub(crate) struct ControllerPersistence {
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
}
// What we store in the database
#[derive(Serialize, Deserialize, Queryable, Selectable, Eq, PartialEq, Debug, Clone)]
#[diesel(table_name = crate::schema::safekeepers)]
pub(crate) struct SafekeeperPersistence {
@@ -1257,11 +1261,51 @@ pub(crate) struct SafekeeperPersistence {
pub(crate) active: bool,
pub(crate) http_port: i32,
pub(crate) availability_zone_id: String,
pub(crate) scheduling_policy: String,
}
impl SafekeeperPersistence {
fn as_insert_or_update(&self) -> InsertUpdateSafekeeper<'_> {
InsertUpdateSafekeeper {
pub(crate) fn as_describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let scheduling_policy =
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {
DatabaseError::Logical(format!("can't construct SkSchedulingPolicy: {e:?}"))
})?;
// omit the `active` flag on purpose: it is deprecated.
Ok(SafekeeperDescribeResponse {
id: NodeId(self.id as u64),
region_id: self.region_id.clone(),
version: self.version,
host: self.host.clone(),
port: self.port,
http_port: self.http_port,
availability_zone_id: self.availability_zone_id.clone(),
scheduling_policy,
})
}
}
/// What we expect from the upsert http api
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)]
pub(crate) struct SafekeeperUpsert {
pub(crate) id: i64,
pub(crate) region_id: String,
/// 1 is special, it means just created (not currently posted to storcon).
/// Zero or negative is not really expected.
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
pub(crate) version: i64,
pub(crate) host: String,
pub(crate) port: i32,
pub(crate) active: bool,
pub(crate) http_port: i32,
pub(crate) availability_zone_id: String,
}
impl SafekeeperUpsert {
fn as_insert_or_update(&self) -> anyhow::Result<InsertUpdateSafekeeper<'_>> {
if self.version < 0 {
anyhow::bail!("negative version: {}", self.version);
}
Ok(InsertUpdateSafekeeper {
id: self.id,
region_id: &self.region_id,
version: self.version,
@@ -1270,19 +1314,9 @@ impl SafekeeperPersistence {
active: self.active,
http_port: self.http_port,
availability_zone_id: &self.availability_zone_id,
}
}
pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
// omit the `active` flag on purpose: it is deprecated.
SafekeeperDescribeResponse {
id: NodeId(self.id as u64),
region_id: self.region_id.clone(),
version: self.version,
host: self.host.clone(),
port: self.port,
http_port: self.http_port,
availability_zone_id: self.availability_zone_id.clone(),
}
// None means a wish to not update this column. We expose abilities to update it via other means.
scheduling_policy: None,
})
}
}
@@ -1297,4 +1331,5 @@ struct InsertUpdateSafekeeper<'a> {
active: bool,
http_port: i32,
availability_zone_id: &'a str,
scheduling_policy: Option<&'a str>,
}

View File

@@ -39,6 +39,7 @@ diesel::table! {
active -> Bool,
http_port -> Int4,
availability_zone_id -> Text,
scheduling_policy -> Varchar,
}
}

View File

@@ -7350,13 +7350,12 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
Ok(self
.persistence
self.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Vec<_>>())
.collect::<Result<Vec<_>, _>>()
}
pub(crate) async fn get_safekeeper(
@@ -7366,12 +7365,12 @@ impl Service {
self.persistence
.safekeeper_get(id)
.await
.map(|v| v.as_describe_response())
.and_then(|v| v.as_describe_response())
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperPersistence,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), DatabaseError> {
self.persistence.safekeeper_upsert(record).await
}

View File

@@ -3019,7 +3019,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]
masked_keys = ["created_at", "updated_at", "active"]
masked_keys = ["created_at", "updated_at", "active", "scheduling_policy"]
for d in compared:
# keep deleting these in case we are comparing the body as it will be uploaded by real scripts