mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Add endpoint and storcon cli cmd to set sk scheduling policy (#10400)
Implementing the last missing endpoint of #9981, this adds support to set the scheduling policy of an individual safekeeper, as specified in the RFC. However, unlike in the RFC we call the endpoint `scheduling_policy` not `status` Closes #9981. As for why not use the upsert endpoint for this: we want to have the safekeeper upsert endpoint be used for testing and for deploying new safekeepers, but not for changes of the scheduling policy. We don't want to change any of the other fields when marking a safekeeper as decommissioned for example, so we'd have to first fetch them only to then specify them again. Of course one can also design an endpoint where one can omit any field and it doesn't get modified, but it's still not great for observability to put everything into one big "change something about this safekeeper" endpoint.
This commit is contained in:
@@ -9,8 +9,9 @@ use clap::{Parser, Subcommand};
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
|
||||
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
|
||||
ShardsPreferredAzsRequest, SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse,
|
||||
TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
||||
@@ -231,6 +232,13 @@ enum Command {
|
||||
},
|
||||
/// List safekeepers known to the storage controller
|
||||
Safekeepers {},
|
||||
/// Set the scheduling policy of the specified safekeeper
|
||||
SafekeeperScheduling {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
#[arg(long)]
|
||||
scheduling_policy: SkSchedulingPolicyArg,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -283,6 +291,24 @@ impl FromStr for PlacementPolicyArg {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct SkSchedulingPolicyArg(SkSchedulingPolicy);
|
||||
|
||||
impl FromStr for SkSchedulingPolicyArg {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"active" => Ok(Self(SkSchedulingPolicy::Active)),
|
||||
"disabled" => Ok(Self(SkSchedulingPolicy::Disabled)),
|
||||
"decomissioned" => Ok(Self(SkSchedulingPolicy::Decomissioned)),
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"Unknown scheduling policy '{s}', try active,disabled,decomissioned"
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
|
||||
|
||||
@@ -1202,6 +1228,23 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
Command::SafekeeperScheduling {
|
||||
node_id,
|
||||
scheduling_policy,
|
||||
} => {
|
||||
let scheduling_policy = scheduling_policy.0;
|
||||
storcon_client
|
||||
.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
|
||||
Method::POST,
|
||||
format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
|
||||
Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
|
||||
)
|
||||
.await?;
|
||||
println!(
|
||||
"Scheduling policy of {node_id} set to {}",
|
||||
String::from(scheduling_policy)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -416,8 +416,6 @@ pub struct MetadataHealthListOutdatedResponse {
|
||||
}
|
||||
|
||||
/// Publicly exposed safekeeper description
|
||||
///
|
||||
/// The `active` flag which we have in the DB is not included on purpose: it is deprecated.
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SafekeeperDescribeResponse {
|
||||
pub id: NodeId,
|
||||
@@ -433,6 +431,11 @@ pub struct SafekeeperDescribeResponse {
|
||||
pub scheduling_policy: SkSchedulingPolicy,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SafekeeperSchedulingPolicyRequest {
|
||||
pub scheduling_policy: SkSchedulingPolicy,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
@@ -15,7 +15,7 @@ use metrics::{BuildInfo, NeonMetrics};
|
||||
use pageserver_api::controller_api::{
|
||||
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
|
||||
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
|
||||
ShardsPreferredAzsRequest, TenantCreateRequest,
|
||||
SafekeeperSchedulingPolicyRequest, ShardsPreferredAzsRequest, TenantCreateRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
|
||||
@@ -1305,6 +1305,35 @@ async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Bod
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
/// Sets the scheduling policy of the specified safekeeper
|
||||
async fn handle_safekeeper_scheduling_policy(
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
|
||||
let id = parse_request_param::<i64>(&req, "id")?;
|
||||
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
state
|
||||
.service
|
||||
.set_safekeeper_scheduling_policy(id, body.scheduling_policy)
|
||||
.await?;
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
|
||||
/// be allowed to run if Service has finished its initial reconciliation.
|
||||
async fn tenant_service_handler<R, H>(
|
||||
@@ -1873,7 +1902,18 @@ pub fn make_router(
|
||||
})
|
||||
.post("/control/v1/safekeeper/:id", |r| {
|
||||
// id is in the body
|
||||
named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
|
||||
named_request_span(
|
||||
r,
|
||||
handle_upsert_safekeeper,
|
||||
RequestName("v1_safekeeper_post"),
|
||||
)
|
||||
})
|
||||
.post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_safekeeper_scheduling_policy,
|
||||
RequestName("v1_safekeeper_status"),
|
||||
)
|
||||
})
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
|
||||
@@ -1104,6 +1104,37 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn set_safekeeper_scheduling_policy(
|
||||
&self,
|
||||
id_: i64,
|
||||
scheduling_policy_: SkSchedulingPolicy,
|
||||
) -> Result<(), DatabaseError> {
|
||||
use crate::schema::safekeepers::dsl::*;
|
||||
|
||||
self.with_conn(move |conn| -> DatabaseResult<()> {
|
||||
#[derive(Insertable, AsChangeset)]
|
||||
#[diesel(table_name = crate::schema::safekeepers)]
|
||||
struct UpdateSkSchedulingPolicy<'a> {
|
||||
id: i64,
|
||||
scheduling_policy: &'a str,
|
||||
}
|
||||
let scheduling_policy_ = String::from(scheduling_policy_);
|
||||
|
||||
let rows_affected = diesel::update(safekeepers.filter(id.eq(id_)))
|
||||
.set(scheduling_policy.eq(scheduling_policy_))
|
||||
.execute(conn)?;
|
||||
|
||||
if rows_affected != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({rows_affected})",
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
|
||||
@@ -47,7 +47,7 @@ use pageserver_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
|
||||
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
|
||||
ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
|
||||
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
|
||||
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TenantShardMigrateResponse,
|
||||
@@ -7651,6 +7651,16 @@ impl Service {
|
||||
self.persistence.safekeeper_upsert(record).await
|
||||
}
|
||||
|
||||
pub(crate) async fn set_safekeeper_scheduling_policy(
|
||||
&self,
|
||||
id: i64,
|
||||
scheduling_policy: SkSchedulingPolicy,
|
||||
) -> Result<(), DatabaseError> {
|
||||
self.persistence
|
||||
.set_safekeeper_scheduling_policy(id, scheduling_policy)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn update_shards_preferred_azs(
|
||||
&self,
|
||||
req: ShardsPreferredAzsRequest,
|
||||
|
||||
@@ -2336,6 +2336,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
json=body,
|
||||
)
|
||||
|
||||
def safekeeper_scheduling_policy(self, id: int, scheduling_policy: str):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.api}/control/v1/safekeeper/{id}/scheduling_policy",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
json={"id": id, "scheduling_policy": scheduling_policy},
|
||||
)
|
||||
|
||||
def get_safekeeper(self, id: int) -> dict[str, Any] | None:
|
||||
try:
|
||||
response = self.request(
|
||||
@@ -4135,7 +4143,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
# Checkpoints running endpoint and returns pg_wal size in MB.
|
||||
def get_pg_wal_size(self):
|
||||
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
|
||||
log.info(f"checkpointing at LSN {self.safe_psql('select pg_current_wal_lsn()')[0][0]}")
|
||||
self.safe_psql("checkpoint")
|
||||
assert self.pgdata_dir is not None # please mypy
|
||||
return get_dir_size(self.pgdata_dir / "pg_wal") / 1024 / 1024
|
||||
@@ -4975,7 +4983,7 @@ def logical_replication_sync(
|
||||
if res:
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={publisher_lsn}")
|
||||
if subscriber_lsn >= publisher_lsn:
|
||||
return subscriber_lsn
|
||||
time.sleep(0.5)
|
||||
|
||||
@@ -3208,6 +3208,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert eq_safekeeper_records(body, inserted_now)
|
||||
|
||||
# some small tests for the scheduling policy querying and returning APIs
|
||||
newest_info = target.get_safekeeper(inserted["id"])
|
||||
assert newest_info
|
||||
assert newest_info["scheduling_policy"] == "Disabled"
|
||||
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
|
||||
newest_info = target.get_safekeeper(inserted["id"])
|
||||
assert newest_info
|
||||
assert newest_info["scheduling_policy"] == "Decomissioned"
|
||||
# Ensure idempotency
|
||||
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
|
||||
|
||||
|
||||
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
compared = [dict(a), dict(b)]
|
||||
|
||||
Reference in New Issue
Block a user