storage controller: tenant scheduling policy (#7262)

## Problem

In the event of bugs with scheduling or reconciliation, we need to be
able to switch this off at a per-tenant granularity.

This is intended to mitigate risk of issues with
https://github.com/neondatabase/neon/pull/7181, which makes scheduling
more involved.

Closes: #7103

## Summary of changes

- Introduce a scheduling policy per tenant, with API to set it
- Refactor persistent.rs helpers for updating tenants to be more general
- Add tests
This commit is contained in:
John Spray
2024-03-28 14:19:25 +00:00
committed by GitHub
parent 5928f6709c
commit 6633332e67
10 changed files with 448 additions and 79 deletions

View File

@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE tenant_shards drop scheduling_policy;

View File

@@ -0,0 +1,2 @@
ALTER TABLE tenant_shards add scheduling_policy VARCHAR NOT NULL DEFAULT '"Active"';

View File

@@ -34,7 +34,8 @@ use utils::{
}; };
use pageserver_api::controller_api::{ use pageserver_api::controller_api::{
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest, NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
TenantShardMigrateRequest,
}; };
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest}; use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -478,6 +479,22 @@ async fn handle_tenant_shard_migrate(
) )
} }
async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.tenant_update_policy(tenant_id, update_req)
.await?,
)
}
async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?; check_permissions(&req, Scope::PageServerApi)?;
@@ -509,6 +526,14 @@ async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, state.service.consistency_check().await?) json_response(StatusCode::OK, state.service.consistency_check().await?)
} }
async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
}
/// Status endpoint is just used for checking that our HTTP listener is up /// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ()) json_response(StatusCode::OK, ())
@@ -726,6 +751,9 @@ pub fn make_router(
RequestName("debug_v1_consistency_check"), RequestName("debug_v1_consistency_check"),
) )
}) })
.post("/debug/v1/reconcile_all", |r| {
request_span(r, handle_reconcile_all)
})
.put("/debug/v1/failpoints", |r| { .put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new())) request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
}) })
@@ -765,6 +793,13 @@ pub fn make_router(
RequestName("control_v1_tenant_describe"), RequestName("control_v1_tenant_describe"),
) )
}) })
.put("/control/v1/tenant/:tenant_id/policy", |r| {
named_request_span(
r,
handle_tenant_update_policy,
RequestName("control_v1_tenant_policy"),
)
})
// Tenant operations // Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity. // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.

View File

@@ -9,6 +9,7 @@ use camino::Utf8PathBuf;
use diesel::pg::PgConnection; use diesel::pg::PgConnection;
use diesel::prelude::*; use diesel::prelude::*;
use diesel::Connection; use diesel::Connection;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig; use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError; use pageserver_api::shard::ShardConfigError;
@@ -107,6 +108,12 @@ pub(crate) enum AbortShardSplitStatus {
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>; pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
/// Some methods can operate on either a whole tenant or a single shard
pub(crate) enum TenantFilter {
Tenant(TenantId),
Shard(TenantShardId),
}
impl Persistence { impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under // The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect. // normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
@@ -140,7 +147,7 @@ impl Persistence {
/// Wraps `with_conn` in order to collect latency and error metrics /// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R> async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
where where
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static, F: FnOnce(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static, R: Send + 'static,
{ {
let latency = &METRICS_REGISTRY let latency = &METRICS_REGISTRY
@@ -168,7 +175,7 @@ impl Persistence {
/// Call the provided function in a tokio blocking thread, with a Diesel database connection. /// Call the provided function in a tokio blocking thread, with a Diesel database connection.
async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R> async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
where where
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static, F: FnOnce(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static, R: Send + 'static,
{ {
let mut conn = self.connection_pool.get()?; let mut conn = self.connection_pool.get()?;
@@ -275,6 +282,11 @@ impl Persistence {
// Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165 // Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
shard.placement_policy = "{\"Attached\":0}".to_string(); shard.placement_policy = "{\"Attached\":0}".to_string();
} }
if shard.scheduling_policy.is_empty() {
shard.scheduling_policy =
serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap();
}
} }
let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect(); let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
@@ -465,59 +477,45 @@ impl Persistence {
/// that we only do the first time a tenant is set to an attached policy via /location_config. /// that we only do the first time a tenant is set to an attached policy via /location_config.
pub(crate) async fn update_tenant_shard( pub(crate) async fn update_tenant_shard(
&self, &self,
tenant_shard_id: TenantShardId, tenant: TenantFilter,
input_placement_policy: PlacementPolicy, input_placement_policy: Option<PlacementPolicy>,
input_config: TenantConfig, input_config: Option<TenantConfig>,
input_generation: Option<Generation>, input_generation: Option<Generation>,
input_scheduling_policy: Option<ShardSchedulingPolicy>,
) -> DatabaseResult<()> { ) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*; use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| { self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
let query = diesel::update(tenant_shards) let query = match tenant {
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)); .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.into_boxed(),
TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
.filter(tenant_id.eq(input_tenant_id.to_string()))
.into_boxed(),
};
if let Some(input_generation) = input_generation { #[derive(AsChangeset)]
// Update includes generation column #[diesel(table_name = crate::schema::tenant_shards)]
query struct ShardUpdate {
.set(( generation: Option<i32>,
generation.eq(Some(input_generation.into().unwrap() as i32)), placement_policy: Option<String>,
placement_policy config: Option<String>,
.eq(serde_json::to_string(&input_placement_policy).unwrap()), scheduling_policy: Option<String>,
config.eq(serde_json::to_string(&input_config).unwrap()),
))
.execute(conn)?;
} else {
// Update does not include generation column
query
.set((
placement_policy
.eq(serde_json::to_string(&input_placement_policy).unwrap()),
config.eq(serde_json::to_string(&input_config).unwrap()),
))
.execute(conn)?;
} }
Ok(()) let update = ShardUpdate {
}) generation: input_generation.map(|g| g.into().unwrap() as i32),
.await?; placement_policy: input_placement_policy
.map(|p| serde_json::to_string(&p).unwrap()),
config: input_config.map(|c| serde_json::to_string(&c).unwrap()),
scheduling_policy: input_scheduling_policy
.map(|p| serde_json::to_string(&p).unwrap()),
};
Ok(()) query.set(update).execute(conn)?;
}
pub(crate) async fn update_tenant_config(
&self,
input_tenant_id: TenantId,
input_config: TenantConfig,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::UpdateTenantConfig, move |conn| {
diesel::update(tenant_shards)
.filter(tenant_id.eq(input_tenant_id.to_string()))
.set((config.eq(serde_json::to_string(&input_config).unwrap()),))
.execute(conn)?;
Ok(()) Ok(())
}) })
@@ -728,6 +726,8 @@ pub(crate) struct TenantShardPersistence {
pub(crate) splitting: SplitState, pub(crate) splitting: SplitState,
#[serde(default)] #[serde(default)]
pub(crate) config: String, pub(crate) config: String,
#[serde(default)]
pub(crate) scheduling_policy: String,
} }
impl TenantShardPersistence { impl TenantShardPersistence {

View File

@@ -22,6 +22,7 @@ diesel::table! {
placement_policy -> Varchar, placement_policy -> Varchar,
splitting -> Int2, splitting -> Int2,
config -> Text, config -> Text,
scheduling_policy -> Varchar,
} }
} }

View File

@@ -8,7 +8,9 @@ use std::{
}; };
use crate::{ use crate::{
id_lock_map::IdLockMap, persistence::AbortShardSplitStatus, reconciler::ReconcileError, id_lock_map::IdLockMap,
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::ReconcileError,
}; };
use anyhow::Context; use anyhow::Context;
use control_plane::storage_controller::{ use control_plane::storage_controller::{
@@ -20,9 +22,10 @@ use hyper::StatusCode;
use pageserver_api::{ use pageserver_api::{
controller_api::{ controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse, ShardSchedulingPolicy, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponseShard, TenantLocateResponse, TenantShardMigrateRequest, TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantShardMigrateResponse, UtilizationScore, TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
UtilizationScore,
}, },
models::{SecondaryProgress, TenantConfigRequest}, models::{SecondaryProgress, TenantConfigRequest},
}; };
@@ -51,7 +54,6 @@ use utils::{
generation::Generation, generation::Generation,
http::error::ApiError, http::error::ApiError,
id::{NodeId, TenantId, TimelineId}, id::{NodeId, TenantId, TimelineId},
seqwait::SeqWait,
sync::gate::Gate, sync::gate::Gate,
}; };
@@ -66,7 +68,6 @@ use crate::{
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError, IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
ReconcilerWaiter, TenantState, ReconcilerWaiter, TenantState,
}, },
Sequence,
}; };
// For operations that should be quick, like attaching a new tenant // For operations that should be quick, like attaching a new tenant
@@ -957,30 +958,14 @@ impl Service {
} }
for tsp in tenant_shard_persistence { for tsp in tenant_shard_persistence {
let tenant_shard_id = tsp.get_tenant_shard_id()?; let tenant_shard_id = tsp.get_tenant_shard_id()?;
let shard_identity = tsp.get_shard_identity()?;
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
// it with what we can infer: the node for which a generation was most recently issued. // it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new(); let mut intent = IntentState::new();
if let Some(generation_pageserver) = tsp.generation_pageserver { if let Some(generation_pageserver) = tsp.generation_pageserver {
intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64))); intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64)));
} }
let new_tenant = TenantState::from_persistent(tsp, intent)?;
let new_tenant = TenantState {
tenant_shard_id,
shard: shard_identity,
sequence: Sequence::initial(),
generation: tsp.generation.map(|g| Generation::new(g as u32)),
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
intent,
observed: ObservedState::new(),
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
splitting: tsp.splitting,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
pending_compute_notification: false,
};
tenants.insert(tenant_shard_id, new_tenant); tenants.insert(tenant_shard_id, new_tenant);
} }
@@ -1104,6 +1089,8 @@ impl Service {
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(), placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(), config: serde_json::to_string(&TenantConfig::default()).unwrap(),
splitting: SplitState::default(), splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
}; };
match self.persistence.insert_tenant_shards(vec![tsp]).await { match self.persistence.insert_tenant_shards(vec![tsp]).await {
@@ -1156,9 +1143,10 @@ impl Service {
// when we reattaching a detached tenant. // when we reattaching a detached tenant.
self.persistence self.persistence
.update_tenant_shard( .update_tenant_shard(
attach_req.tenant_shard_id, TenantFilter::Shard(attach_req.tenant_shard_id),
PlacementPolicy::Attached(0), Some(PlacementPolicy::Attached(0)),
conf, Some(conf),
None,
None, None,
) )
.await?; .await?;
@@ -1615,6 +1603,8 @@ impl Service {
placement_policy: serde_json::to_string(&placement_policy).unwrap(), placement_policy: serde_json::to_string(&placement_policy).unwrap(),
config: serde_json::to_string(&create_req.config).unwrap(), config: serde_json::to_string(&create_req.config).unwrap(),
splitting: SplitState::default(), splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
}) })
.collect(); .collect();
@@ -1907,10 +1897,11 @@ impl Service {
{ {
self.persistence self.persistence
.update_tenant_shard( .update_tenant_shard(
*tenant_shard_id, TenantFilter::Shard(*tenant_shard_id),
placement_policy.clone(), Some(placement_policy.clone()),
tenant_config.clone(), Some(tenant_config.clone()),
*generation, *generation,
None,
) )
.await?; .await?;
} }
@@ -1988,7 +1979,13 @@ impl Service {
let config = req.config; let config = req.config;
self.persistence self.persistence
.update_tenant_config(req.tenant_id, config.clone()) .update_tenant_shard(
TenantFilter::Tenant(req.tenant_id),
None,
Some(config.clone()),
None,
None,
)
.await?; .await?;
let waiters = { let waiters = {
@@ -2341,6 +2338,57 @@ impl Service {
Ok(StatusCode::NOT_FOUND) Ok(StatusCode::NOT_FOUND)
} }
/// Naming: this configures the storage controller's policies for a tenant, whereas [`Self::tenant_config_set`] is "set the TenantConfig"
/// for a tenant. The TenantConfig is passed through to pageservers, whereas this function modifies
/// the tenant's policies (configuration) within the storage controller
pub(crate) async fn tenant_update_policy(
&self,
tenant_id: TenantId,
req: TenantPolicyRequest,
) -> Result<(), ApiError> {
// We require an exclusive lock, because we are updating persistent and in-memory state
let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
let TenantPolicyRequest {
placement,
scheduling,
} = req;
self.persistence
.update_tenant_shard(
TenantFilter::Tenant(tenant_id),
placement.clone(),
None,
None,
scheduling,
)
.await?;
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
if let Some(placement) = &placement {
shard.policy = placement.clone();
tracing::info!(tenant_id=%shard_id.tenant_id, shard_id=%shard_id.shard_slug(),
"Updated placement policy to {placement:?}");
}
if let Some(scheduling) = &scheduling {
shard.set_scheduling_policy(*scheduling);
tracing::info!(tenant_id=%shard_id.tenant_id, shard_id=%shard_id.shard_slug(),
"Updated scheduling policy to {scheduling:?}");
}
// In case scheduling is being switched back on, try it now.
shard.schedule(scheduler).ok();
self.maybe_reconcile_shard(shard, nodes);
}
Ok(())
}
pub(crate) async fn tenant_timeline_create( pub(crate) async fn tenant_timeline_create(
&self, &self,
tenant_id: TenantId, tenant_id: TenantId,
@@ -3250,6 +3298,10 @@ impl Service {
placement_policy: serde_json::to_string(&policy).unwrap(), placement_policy: serde_json::to_string(&policy).unwrap(),
config: serde_json::to_string(&config).unwrap(), config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::Splitting, splitting: SplitState::Splitting,
// Scheduling policies do not carry through to children
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
}); });
} }
@@ -3970,6 +4022,28 @@ impl Service {
reconciles_spawned reconciles_spawned
} }
/// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
/// put the system into a quiescent state where future background reconciliations won't do anything.
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
self.reconcile_all();
let waiters = {
let mut waiters = Vec::new();
let locked = self.inner.read().unwrap();
for (_tenant_shard_id, shard) in locked.tenants.iter() {
if let Some(waiter) = shard.get_waiter() {
waiters.push(waiter);
}
}
waiters
};
let waiter_count = waiters.len();
self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
Ok(waiter_count)
}
pub async fn shutdown(&self) { pub async fn shutdown(&self) {
// Note that this already stops processing any results from reconciles: so // Note that this already stops processing any results from reconciles: so
// we do not expect that our [`TenantState`] objects will reach a neat // we do not expect that our [`TenantState`] objects will reach a neat

View File

@@ -8,7 +8,7 @@ use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome}, metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence, persistence::TenantShardPersistence,
}; };
use pageserver_api::controller_api::PlacementPolicy; use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{ use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig}, models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId}, shard::{ShardIdentity, TenantShardId},
@@ -116,6 +116,10 @@ pub(crate) struct TenantState {
/// sending it. This is the mechanism by which compute notifications are included in the scope /// sending it. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way. /// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool, pub(crate) pending_compute_notification: bool,
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
// be set to a non-active state to avoid making changes while the issue is fixed.
scheduling_policy: ShardSchedulingPolicy,
} }
#[derive(Default, Clone, Debug, Serialize)] #[derive(Default, Clone, Debug, Serialize)]
@@ -370,6 +374,7 @@ impl TenantState {
error_waiter: Arc::new(SeqWait::new(Sequence(0))), error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(), last_error: Arc::default(),
pending_compute_notification: false, pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
} }
} }
@@ -453,6 +458,16 @@ impl TenantState {
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
// change their attach location. // change their attach location.
match self.scheduling_policy {
ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
// Warn to make it obvious why other things aren't happening/working, if we skip scheduling
tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
"Scheduling is disabled by policy {:?}", self.scheduling_policy);
return Ok(());
}
}
// Build the set of pageservers already in use by this tenant, to avoid scheduling // Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using. // more work on the same pageservers we're already using.
let mut modified = false; let mut modified = false;
@@ -668,6 +683,19 @@ impl TenantState {
} }
} }
// Pre-checks done: finally check whether we may actually do the work
match self.scheduling_policy {
ShardSchedulingPolicy::Active
| ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause => {}
ShardSchedulingPolicy::Stop => {
// We only reach this point if there is work to do and we're going to skip
// doing it: warn it obvious why this tenant isn't doing what it ought to.
tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
return None;
}
}
// Build list of nodes from which the reconciler should detach // Build list of nodes from which the reconciler should detach
let mut detach = Vec::new(); let mut detach = Vec::new();
for node_id in self.observed.locations.keys() { for node_id in self.observed.locations.keys() {
@@ -804,6 +832,22 @@ impl TenantState {
}) })
} }
/// Get a waiter for any reconciliation in flight, but do not start reconciliation
/// if it is not already running
pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
if self.reconciler.is_some() {
Some(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
})
} else {
None
}
}
/// Called when a ReconcileResult has been emitted and the service is updating /// Called when a ReconcileResult has been emitted and the service is updating
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
/// the handle to indicate there is no longer a reconciliation in progress. /// the handle to indicate there is no longer a reconciliation in progress.
@@ -829,6 +873,36 @@ impl TenantState {
debug_assert!(!self.intent.all_pageservers().contains(&node_id)); debug_assert!(!self.intent.all_pageservers().contains(&node_id));
} }
pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
self.scheduling_policy = p;
}
pub(crate) fn from_persistent(
tsp: TenantShardPersistence,
intent: IntentState,
) -> anyhow::Result<Self> {
let tenant_shard_id = tsp.get_tenant_shard_id()?;
let shard_identity = tsp.get_shard_identity()?;
Ok(Self {
tenant_shard_id,
shard: shard_identity,
sequence: Sequence::initial(),
generation: tsp.generation.map(|g| Generation::new(g as u32)),
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
intent,
observed: ObservedState::new(),
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
splitting: tsp.splitting,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
})
}
pub(crate) fn to_persistent(&self) -> TenantShardPersistence { pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
TenantShardPersistence { TenantShardPersistence {
tenant_id: self.tenant_shard_id.tenant_id.to_string(), tenant_id: self.tenant_shard_id.tenant_id.to_string(),
@@ -840,6 +914,7 @@ impl TenantState {
placement_policy: serde_json::to_string(&self.policy).unwrap(), placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(), config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(), splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
} }
} }
} }
@@ -980,4 +1055,25 @@ pub(crate) mod tests {
tenant_state.intent.clear(&mut scheduler); tenant_state.intent.clear(&mut scheduler);
Ok(()) Ok(())
} }
#[test]
fn scheduling_mode() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
// In pause mode, schedule() shouldn't do anything
tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause;
assert!(tenant_state.schedule(&mut scheduler).is_ok());
assert!(tenant_state.intent.all_pageservers().is_empty());
// In active mode, schedule() works
tenant_state.scheduling_policy = ShardSchedulingPolicy::Active;
assert!(tenant_state.schedule(&mut scheduler).is_ok());
assert!(!tenant_state.intent.all_pageservers().is_empty());
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
} }

View File

@@ -42,6 +42,12 @@ pub struct NodeConfigureRequest {
pub scheduling: Option<NodeSchedulingPolicy>, pub scheduling: Option<NodeSchedulingPolicy>,
} }
#[derive(Serialize, Deserialize)]
pub struct TenantPolicyRequest {
pub placement: Option<PlacementPolicy>,
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard { pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId, pub shard_id: TenantShardId,
@@ -170,6 +176,32 @@ impl FromStr for NodeAvailability {
} }
} }
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum ShardSchedulingPolicy {
// Normal mode: the tenant's scheduled locations may be updated at will, including
// for non-essential optimization.
Active,
// Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
// For example, this still permits a node's attachment location to change to a secondary in
// response to a node failure, or to assign a new secondary if a node was removed.
Essential,
// No scheduling: leave the shard running wherever it currently is. Even if the shard is
// unavailable, it will not be rescheduled to another node.
Pause,
// No reconciling: we will make no location_conf API calls to pageservers at all. If the
// shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
Stop,
}
impl Default for ShardSchedulingPolicy {
fn default() -> Self {
Self::Active
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)] #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeSchedulingPolicy { pub enum NodeSchedulingPolicy {
Active, Active,

View File

@@ -2116,6 +2116,7 @@ class NeonStorageController(MetricsGetter):
shard_count: Optional[int] = None, shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None, shard_stripe_size: Optional[int] = None,
tenant_config: Optional[Dict[Any, Any]] = None, tenant_config: Optional[Dict[Any, Any]] = None,
placement_policy: Optional[str] = None,
): ):
""" """
Use this rather than pageserver_api() when you need to include shard parameters Use this rather than pageserver_api() when you need to include shard parameters
@@ -2135,6 +2136,8 @@ class NeonStorageController(MetricsGetter):
for k, v in tenant_config.items(): for k, v in tenant_config.items():
body[k] = v body[k] = v
body["placement_policy"] = placement_policy
response = self.request( response = self.request(
"POST", "POST",
f"{self.env.storage_controller_api}/v1/tenant", f"{self.env.storage_controller_api}/v1/tenant",
@@ -2193,6 +2196,34 @@ class NeonStorageController(MetricsGetter):
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}") log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id
def tenant_policy_update(self, tenant_id: TenantId, body: dict[str, Any]):
log.info(f"tenant_policy_update({tenant_id}, {body})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
def reconcile_all(self):
r = self.request(
"POST",
f"{self.env.storage_controller_api}/debug/v1/reconcile_all",
headers=self.headers(TokenScope.ADMIN),
)
r.raise_for_status()
n = r.json()
log.info(f"reconcile_all waited for {n} shards")
return n
def reconcile_until_idle(self, timeout_secs=30):
start_at = time.time()
n = 1
while n > 0:
n = self.reconcile_all()
if time.time() - start_at > timeout_secs:
raise RuntimeError("Timeout in reconcile_until_idle")
def consistency_check(self): def consistency_check(self):
""" """
Throw an exception if the service finds any inconsistencies in its state Throw an exception if the service finds any inconsistencies in its state

View File

@@ -1015,3 +1015,98 @@ def test_sharding_service_re_attach(neon_env_builder: NeonEnvBuilder):
"storage_controller_reconcile_complete_total", filter={"status": "ok"} "storage_controller_reconcile_complete_total", filter={"status": "ok"}
) )
assert reconciles_after_restart == reconciles_before_restart assert reconciles_after_restart == reconciles_before_restart
def test_storage_controller_shard_scheduling_policy(neon_env_builder: NeonEnvBuilder):
"""
Check that emergency hooks for disabling rogue tenants' reconcilers work as expected.
"""
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.storage_controller.allowed_errors.extend(
[
# We will intentionally cause reconcile errors
".*Reconcile error.*",
# Message from using a scheduling policy
".*Scheduling is disabled by policy.*",
".*Skipping reconcile for policy.*",
# Message from a node being offline
".*Call to node .* management API .* failed",
]
)
# Stop pageserver so that reconcile cannot complete
env.pageserver.stop()
env.storage_controller.tenant_create(tenant_id, placement_policy="Detached")
# Try attaching it: we should see reconciles failing
env.storage_controller.tenant_policy_update(
tenant_id,
{
"placement": {"Attached": 0},
},
)
def reconcile_errors() -> int:
return int(
env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "error"}
)
or 0
)
def reconcile_ok() -> int:
return int(
env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
or 0
)
def assert_errors_gt(n) -> int:
e = reconcile_errors()
assert e > n
return e
errs = wait_until(10, 1, lambda: assert_errors_gt(0))
# Try reconciling again, it should fail again
with pytest.raises(StorageControllerApiException):
env.storage_controller.reconcile_all()
errs = wait_until(10, 1, lambda: assert_errors_gt(errs))
# Configure the tenant to disable reconciles
env.storage_controller.tenant_policy_update(
tenant_id,
{
"scheduling": "Stop",
},
)
# Try reconciling again, it should not cause an error (silently skip)
env.storage_controller.reconcile_all()
assert reconcile_errors() == errs
# Start the pageserver and re-enable reconciles
env.pageserver.start()
env.storage_controller.tenant_policy_update(
tenant_id,
{
"scheduling": "Active",
},
)
def assert_ok_gt(n) -> int:
o = reconcile_ok()
assert o > n
return o
# We should see a successful reconciliation
wait_until(10, 1, lambda: assert_ok_gt(0))
# And indeed the tenant should be attached
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1