diff --git a/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql
new file mode 100644
index 0000000000..33c06dc03d
--- /dev/null
+++ b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql
@@ -0,0 +1,3 @@
+-- This file should undo anything in `up.sql`
+
+ALTER TABLE tenant_shards drop scheduling_policy;
\ No newline at end of file
diff --git a/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql
new file mode 100644
index 0000000000..aa00f0d2ca
--- /dev/null
+++ b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql
@@ -0,0 +1,2 @@
+
+ALTER TABLE tenant_shards add scheduling_policy VARCHAR NOT NULL DEFAULT '"Active"';
diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index 036019cd38..1f3f78bffa 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -34,7 +34,8 @@ use utils::{
};
use pageserver_api::controller_api::{
- NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest,
+ NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
+ TenantShardMigrateRequest,
};
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -478,6 +479,22 @@ async fn handle_tenant_shard_migrate(
)
}
+async fn handle_tenant_update_policy(mut req: Request
) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ let update_req = json_request::(&mut req).await?;
+ let state = get_state(&req);
+
+ json_response(
+ StatusCode::OK,
+ state
+ .service
+ .tenant_update_policy(tenant_id, update_req)
+ .await?,
+ )
+}
+
async fn handle_tenant_drop(req: Request) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
@@ -509,6 +526,14 @@ async fn handle_consistency_check(req: Request) -> Result,
json_response(StatusCode::OK, state.service.consistency_check().await?)
}
+async fn handle_reconcile_all(req: Request) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+
+ json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
+}
+
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request) -> Result, ApiError> {
json_response(StatusCode::OK, ())
@@ -726,6 +751,9 @@ pub fn make_router(
RequestName("debug_v1_consistency_check"),
)
})
+ .post("/debug/v1/reconcile_all", |r| {
+ request_span(r, handle_reconcile_all)
+ })
.put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
})
@@ -765,6 +793,13 @@ pub fn make_router(
RequestName("control_v1_tenant_describe"),
)
})
+ .put("/control/v1/tenant/:tenant_id/policy", |r| {
+ named_request_span(
+ r,
+ handle_tenant_update_policy,
+ RequestName("control_v1_tenant_policy"),
+ )
+ })
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs
index dafd52017b..d60392bdbc 100644
--- a/control_plane/attachment_service/src/persistence.rs
+++ b/control_plane/attachment_service/src/persistence.rs
@@ -9,6 +9,7 @@ use camino::Utf8PathBuf;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
+use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
@@ -107,6 +108,12 @@ pub(crate) enum AbortShardSplitStatus {
pub(crate) type DatabaseResult = Result;
+/// Some methods can operate on either a whole tenant or a single shard
+pub(crate) enum TenantFilter {
+ Tenant(TenantId),
+ Shard(TenantShardId),
+}
+
impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
@@ -140,7 +147,7 @@ impl Persistence {
/// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn(&self, op: DatabaseOperation, func: F) -> DatabaseResult
where
- F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static,
+ F: FnOnce(&mut PgConnection) -> DatabaseResult + Send + 'static,
R: Send + 'static,
{
let latency = &METRICS_REGISTRY
@@ -168,7 +175,7 @@ impl Persistence {
/// Call the provided function in a tokio blocking thread, with a Diesel database connection.
async fn with_conn(&self, func: F) -> DatabaseResult
where
- F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static,
+ F: FnOnce(&mut PgConnection) -> DatabaseResult + Send + 'static,
R: Send + 'static,
{
let mut conn = self.connection_pool.get()?;
@@ -275,6 +282,11 @@ impl Persistence {
// Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
shard.placement_policy = "{\"Attached\":0}".to_string();
}
+
+ if shard.scheduling_policy.is_empty() {
+ shard.scheduling_policy =
+ serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap();
+ }
}
let tenants: Vec = decoded.tenants.into_values().collect();
@@ -465,59 +477,45 @@ impl Persistence {
/// that we only do the first time a tenant is set to an attached policy via /location_config.
pub(crate) async fn update_tenant_shard(
&self,
- tenant_shard_id: TenantShardId,
- input_placement_policy: PlacementPolicy,
- input_config: TenantConfig,
+ tenant: TenantFilter,
+ input_placement_policy: Option,
+ input_config: Option,
input_generation: Option,
+ input_scheduling_policy: Option,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
- let query = diesel::update(tenant_shards)
- .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
- .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
- .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32));
+ let query = match tenant {
+ TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
+ .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
+ .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
+ .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
+ .into_boxed(),
+ TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
+ .filter(tenant_id.eq(input_tenant_id.to_string()))
+ .into_boxed(),
+ };
- if let Some(input_generation) = input_generation {
- // Update includes generation column
- query
- .set((
- generation.eq(Some(input_generation.into().unwrap() as i32)),
- placement_policy
- .eq(serde_json::to_string(&input_placement_policy).unwrap()),
- config.eq(serde_json::to_string(&input_config).unwrap()),
- ))
- .execute(conn)?;
- } else {
- // Update does not include generation column
- query
- .set((
- placement_policy
- .eq(serde_json::to_string(&input_placement_policy).unwrap()),
- config.eq(serde_json::to_string(&input_config).unwrap()),
- ))
- .execute(conn)?;
+ #[derive(AsChangeset)]
+ #[diesel(table_name = crate::schema::tenant_shards)]
+ struct ShardUpdate {
+ generation: Option,
+ placement_policy: Option,
+ config: Option,
+ scheduling_policy: Option,
}
- Ok(())
- })
- .await?;
+ let update = ShardUpdate {
+ generation: input_generation.map(|g| g.into().unwrap() as i32),
+ placement_policy: input_placement_policy
+ .map(|p| serde_json::to_string(&p).unwrap()),
+ config: input_config.map(|c| serde_json::to_string(&c).unwrap()),
+ scheduling_policy: input_scheduling_policy
+ .map(|p| serde_json::to_string(&p).unwrap()),
+ };
- Ok(())
- }
-
- pub(crate) async fn update_tenant_config(
- &self,
- input_tenant_id: TenantId,
- input_config: TenantConfig,
- ) -> DatabaseResult<()> {
- use crate::schema::tenant_shards::dsl::*;
-
- self.with_measured_conn(DatabaseOperation::UpdateTenantConfig, move |conn| {
- diesel::update(tenant_shards)
- .filter(tenant_id.eq(input_tenant_id.to_string()))
- .set((config.eq(serde_json::to_string(&input_config).unwrap()),))
- .execute(conn)?;
+ query.set(update).execute(conn)?;
Ok(())
})
@@ -728,6 +726,8 @@ pub(crate) struct TenantShardPersistence {
pub(crate) splitting: SplitState,
#[serde(default)]
pub(crate) config: String,
+ #[serde(default)]
+ pub(crate) scheduling_policy: String,
}
impl TenantShardPersistence {
diff --git a/control_plane/attachment_service/src/schema.rs b/control_plane/attachment_service/src/schema.rs
index 76e4e56a66..ff37d0fe77 100644
--- a/control_plane/attachment_service/src/schema.rs
+++ b/control_plane/attachment_service/src/schema.rs
@@ -22,6 +22,7 @@ diesel::table! {
placement_policy -> Varchar,
splitting -> Int2,
config -> Text,
+ scheduling_policy -> Varchar,
}
}
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index 925910253b..cceecebb7f 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -8,7 +8,9 @@ use std::{
};
use crate::{
- id_lock_map::IdLockMap, persistence::AbortShardSplitStatus, reconciler::ReconcileError,
+ id_lock_map::IdLockMap,
+ persistence::{AbortShardSplitStatus, TenantFilter},
+ reconciler::ReconcileError,
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -20,9 +22,10 @@ use hyper::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
- TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
- TenantDescribeResponseShard, TenantLocateResponse, TenantShardMigrateRequest,
- TenantShardMigrateResponse, UtilizationScore,
+ ShardSchedulingPolicy, TenantCreateResponse, TenantCreateResponseShard,
+ TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
+ TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
+ UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest},
};
@@ -51,7 +54,6 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
- seqwait::SeqWait,
sync::gate::Gate,
};
@@ -66,7 +68,6 @@ use crate::{
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
ReconcilerWaiter, TenantState,
},
- Sequence,
};
// For operations that should be quick, like attaching a new tenant
@@ -957,30 +958,14 @@ impl Service {
}
for tsp in tenant_shard_persistence {
let tenant_shard_id = tsp.get_tenant_shard_id()?;
- let shard_identity = tsp.get_shard_identity()?;
+
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
if let Some(generation_pageserver) = tsp.generation_pageserver {
intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64)));
}
-
- let new_tenant = TenantState {
- tenant_shard_id,
- shard: shard_identity,
- sequence: Sequence::initial(),
- generation: tsp.generation.map(|g| Generation::new(g as u32)),
- policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
- intent,
- observed: ObservedState::new(),
- config: serde_json::from_str(&tsp.config).unwrap(),
- reconciler: None,
- splitting: tsp.splitting,
- waiter: Arc::new(SeqWait::new(Sequence::initial())),
- error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
- last_error: Arc::default(),
- pending_compute_notification: false,
- };
+ let new_tenant = TenantState::from_persistent(tsp, intent)?;
tenants.insert(tenant_shard_id, new_tenant);
}
@@ -1104,6 +1089,8 @@ impl Service {
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
splitting: SplitState::default(),
+ scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
+ .unwrap(),
};
match self.persistence.insert_tenant_shards(vec![tsp]).await {
@@ -1156,9 +1143,10 @@ impl Service {
// when we reattaching a detached tenant.
self.persistence
.update_tenant_shard(
- attach_req.tenant_shard_id,
- PlacementPolicy::Attached(0),
- conf,
+ TenantFilter::Shard(attach_req.tenant_shard_id),
+ Some(PlacementPolicy::Attached(0)),
+ Some(conf),
+ None,
None,
)
.await?;
@@ -1615,6 +1603,8 @@ impl Service {
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
config: serde_json::to_string(&create_req.config).unwrap(),
splitting: SplitState::default(),
+ scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
+ .unwrap(),
})
.collect();
@@ -1907,10 +1897,11 @@ impl Service {
{
self.persistence
.update_tenant_shard(
- *tenant_shard_id,
- placement_policy.clone(),
- tenant_config.clone(),
+ TenantFilter::Shard(*tenant_shard_id),
+ Some(placement_policy.clone()),
+ Some(tenant_config.clone()),
*generation,
+ None,
)
.await?;
}
@@ -1988,7 +1979,13 @@ impl Service {
let config = req.config;
self.persistence
- .update_tenant_config(req.tenant_id, config.clone())
+ .update_tenant_shard(
+ TenantFilter::Tenant(req.tenant_id),
+ None,
+ Some(config.clone()),
+ None,
+ None,
+ )
.await?;
let waiters = {
@@ -2341,6 +2338,57 @@ impl Service {
Ok(StatusCode::NOT_FOUND)
}
+ /// Naming: this configures the storage controller's policies for a tenant, whereas [`Self::tenant_config_set`] is "set the TenantConfig"
+ /// for a tenant. The TenantConfig is passed through to pageservers, whereas this function modifies
+ /// the tenant's policies (configuration) within the storage controller
+ pub(crate) async fn tenant_update_policy(
+ &self,
+ tenant_id: TenantId,
+ req: TenantPolicyRequest,
+ ) -> Result<(), ApiError> {
+ // We require an exclusive lock, because we are updating persistent and in-memory state
+ let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
+
+ let TenantPolicyRequest {
+ placement,
+ scheduling,
+ } = req;
+
+ self.persistence
+ .update_tenant_shard(
+ TenantFilter::Tenant(tenant_id),
+ placement.clone(),
+ None,
+ None,
+ scheduling,
+ )
+ .await?;
+
+ let mut locked = self.inner.write().unwrap();
+ let (nodes, tenants, scheduler) = locked.parts_mut();
+ for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
+ if let Some(placement) = &placement {
+ shard.policy = placement.clone();
+
+ tracing::info!(tenant_id=%shard_id.tenant_id, shard_id=%shard_id.shard_slug(),
+ "Updated placement policy to {placement:?}");
+ }
+
+ if let Some(scheduling) = &scheduling {
+ shard.set_scheduling_policy(*scheduling);
+
+ tracing::info!(tenant_id=%shard_id.tenant_id, shard_id=%shard_id.shard_slug(),
+ "Updated scheduling policy to {scheduling:?}");
+ }
+
+ // In case scheduling is being switched back on, try it now.
+ shard.schedule(scheduler).ok();
+ self.maybe_reconcile_shard(shard, nodes);
+ }
+
+ Ok(())
+ }
+
pub(crate) async fn tenant_timeline_create(
&self,
tenant_id: TenantId,
@@ -3250,6 +3298,10 @@ impl Service {
placement_policy: serde_json::to_string(&policy).unwrap(),
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::Splitting,
+
+ // Scheduling policies do not carry through to children
+ scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
+ .unwrap(),
});
}
@@ -3970,6 +4022,28 @@ impl Service {
reconciles_spawned
}
+ /// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
+ /// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
+ /// put the system into a quiescent state where future background reconciliations won't do anything.
+ pub(crate) async fn reconcile_all_now(&self) -> Result {
+ self.reconcile_all();
+
+ let waiters = {
+ let mut waiters = Vec::new();
+ let locked = self.inner.read().unwrap();
+ for (_tenant_shard_id, shard) in locked.tenants.iter() {
+ if let Some(waiter) = shard.get_waiter() {
+ waiters.push(waiter);
+ }
+ }
+ waiters
+ };
+
+ let waiter_count = waiters.len();
+ self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
+ Ok(waiter_count)
+ }
+
pub async fn shutdown(&self) {
// Note that this already stops processing any results from reconciles: so
// we do not expect that our [`TenantState`] objects will reach a neat
diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs
index 83c921dc58..3dc3483e09 100644
--- a/control_plane/attachment_service/src/tenant_state.rs
+++ b/control_plane/attachment_service/src/tenant_state.rs
@@ -8,7 +8,7 @@ use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
};
-use pageserver_api::controller_api::PlacementPolicy;
+use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -116,6 +116,10 @@ pub(crate) struct TenantState {
/// sending it. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool,
+
+ // Support/debug tool: if something is going wrong or flapping with scheduling, this may
+ // be set to a non-active state to avoid making changes while the issue is fixed.
+ scheduling_policy: ShardSchedulingPolicy,
}
#[derive(Default, Clone, Debug, Serialize)]
@@ -370,6 +374,7 @@ impl TenantState {
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
pending_compute_notification: false,
+ scheduling_policy: ShardSchedulingPolicy::default(),
}
}
@@ -453,6 +458,16 @@ impl TenantState {
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
// change their attach location.
+ match self.scheduling_policy {
+ ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
+ ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
+ // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
+ tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
+ "Scheduling is disabled by policy {:?}", self.scheduling_policy);
+ return Ok(());
+ }
+ }
+
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut modified = false;
@@ -668,6 +683,19 @@ impl TenantState {
}
}
+ // Pre-checks done: finally check whether we may actually do the work
+ match self.scheduling_policy {
+ ShardSchedulingPolicy::Active
+ | ShardSchedulingPolicy::Essential
+ | ShardSchedulingPolicy::Pause => {}
+ ShardSchedulingPolicy::Stop => {
+ // We only reach this point if there is work to do and we're going to skip
+ // doing it: warn it obvious why this tenant isn't doing what it ought to.
+ tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
+ return None;
+ }
+ }
+
// Build list of nodes from which the reconciler should detach
let mut detach = Vec::new();
for node_id in self.observed.locations.keys() {
@@ -804,6 +832,22 @@ impl TenantState {
})
}
+ /// Get a waiter for any reconciliation in flight, but do not start reconciliation
+ /// if it is not already running
+ pub(crate) fn get_waiter(&self) -> Option {
+ if self.reconciler.is_some() {
+ Some(ReconcilerWaiter {
+ tenant_shard_id: self.tenant_shard_id,
+ seq_wait: self.waiter.clone(),
+ error_seq_wait: self.error_waiter.clone(),
+ error: self.last_error.clone(),
+ seq: self.sequence,
+ })
+ } else {
+ None
+ }
+ }
+
/// Called when a ReconcileResult has been emitted and the service is updating
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
/// the handle to indicate there is no longer a reconciliation in progress.
@@ -829,6 +873,36 @@ impl TenantState {
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
}
+ pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
+ self.scheduling_policy = p;
+ }
+
+ pub(crate) fn from_persistent(
+ tsp: TenantShardPersistence,
+ intent: IntentState,
+ ) -> anyhow::Result {
+ let tenant_shard_id = tsp.get_tenant_shard_id()?;
+ let shard_identity = tsp.get_shard_identity()?;
+
+ Ok(Self {
+ tenant_shard_id,
+ shard: shard_identity,
+ sequence: Sequence::initial(),
+ generation: tsp.generation.map(|g| Generation::new(g as u32)),
+ policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
+ intent,
+ observed: ObservedState::new(),
+ config: serde_json::from_str(&tsp.config).unwrap(),
+ reconciler: None,
+ splitting: tsp.splitting,
+ waiter: Arc::new(SeqWait::new(Sequence::initial())),
+ error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
+ last_error: Arc::default(),
+ pending_compute_notification: false,
+ scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
+ })
+ }
+
pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
TenantShardPersistence {
tenant_id: self.tenant_shard_id.tenant_id.to_string(),
@@ -840,6 +914,7 @@ impl TenantState {
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
+ scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
}
}
}
@@ -980,4 +1055,25 @@ pub(crate) mod tests {
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
+
+ #[test]
+ fn scheduling_mode() -> anyhow::Result<()> {
+ let nodes = make_test_nodes(3);
+ let mut scheduler = Scheduler::new(nodes.values());
+
+ let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
+
+ // In pause mode, schedule() shouldn't do anything
+ tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause;
+ assert!(tenant_state.schedule(&mut scheduler).is_ok());
+ assert!(tenant_state.intent.all_pageservers().is_empty());
+
+ // In active mode, schedule() works
+ tenant_state.scheduling_policy = ShardSchedulingPolicy::Active;
+ assert!(tenant_state.schedule(&mut scheduler).is_ok());
+ assert!(!tenant_state.intent.all_pageservers().is_empty());
+
+ tenant_state.intent.clear(&mut scheduler);
+ Ok(())
+ }
}
diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs
index e33bd0f486..dcf9e38106 100644
--- a/libs/pageserver_api/src/controller_api.rs
+++ b/libs/pageserver_api/src/controller_api.rs
@@ -42,6 +42,12 @@ pub struct NodeConfigureRequest {
pub scheduling: Option,
}
+#[derive(Serialize, Deserialize)]
+pub struct TenantPolicyRequest {
+ pub placement: Option,
+ pub scheduling: Option,
+}
+
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -170,6 +176,32 @@ impl FromStr for NodeAvailability {
}
}
+#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
+pub enum ShardSchedulingPolicy {
+ // Normal mode: the tenant's scheduled locations may be updated at will, including
+ // for non-essential optimization.
+ Active,
+
+ // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
+ // For example, this still permits a node's attachment location to change to a secondary in
+ // response to a node failure, or to assign a new secondary if a node was removed.
+ Essential,
+
+ // No scheduling: leave the shard running wherever it currently is. Even if the shard is
+ // unavailable, it will not be rescheduled to another node.
+ Pause,
+
+ // No reconciling: we will make no location_conf API calls to pageservers at all. If the
+ // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
+ Stop,
+}
+
+impl Default for ShardSchedulingPolicy {
+ fn default() -> Self {
+ Self::Active
+ }
+}
+
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeSchedulingPolicy {
Active,
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 3d60f9bef5..d0519d3406 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -2116,6 +2116,7 @@ class NeonStorageController(MetricsGetter):
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
tenant_config: Optional[Dict[Any, Any]] = None,
+ placement_policy: Optional[str] = None,
):
"""
Use this rather than pageserver_api() when you need to include shard parameters
@@ -2135,6 +2136,8 @@ class NeonStorageController(MetricsGetter):
for k, v in tenant_config.items():
body[k] = v
+ body["placement_policy"] = placement_policy
+
response = self.request(
"POST",
f"{self.env.storage_controller_api}/v1/tenant",
@@ -2193,6 +2196,34 @@ class NeonStorageController(MetricsGetter):
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id
+ def tenant_policy_update(self, tenant_id: TenantId, body: dict[str, Any]):
+ log.info(f"tenant_policy_update({tenant_id}, {body})")
+ self.request(
+ "PUT",
+ f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy",
+ json=body,
+ headers=self.headers(TokenScope.ADMIN),
+ )
+
+ def reconcile_all(self):
+ r = self.request(
+ "POST",
+ f"{self.env.storage_controller_api}/debug/v1/reconcile_all",
+ headers=self.headers(TokenScope.ADMIN),
+ )
+ r.raise_for_status()
+ n = r.json()
+ log.info(f"reconcile_all waited for {n} shards")
+ return n
+
+ def reconcile_until_idle(self, timeout_secs=30):
+ start_at = time.time()
+ n = 1
+ while n > 0:
+ n = self.reconcile_all()
+ if time.time() - start_at > timeout_secs:
+ raise RuntimeError("Timeout in reconcile_until_idle")
+
def consistency_check(self):
"""
Throw an exception if the service finds any inconsistencies in its state
diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py
index fc6c137667..c33d2ca0da 100644
--- a/test_runner/regress/test_sharding_service.py
+++ b/test_runner/regress/test_sharding_service.py
@@ -1015,3 +1015,98 @@ def test_sharding_service_re_attach(neon_env_builder: NeonEnvBuilder):
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_after_restart == reconciles_before_restart
+
+
+def test_storage_controller_shard_scheduling_policy(neon_env_builder: NeonEnvBuilder):
+ """
+ Check that emergency hooks for disabling rogue tenants' reconcilers work as expected.
+ """
+ env = neon_env_builder.init_configs()
+ env.start()
+
+ tenant_id = TenantId.generate()
+
+ env.storage_controller.allowed_errors.extend(
+ [
+ # We will intentionally cause reconcile errors
+ ".*Reconcile error.*",
+ # Message from using a scheduling policy
+ ".*Scheduling is disabled by policy.*",
+ ".*Skipping reconcile for policy.*",
+ # Message from a node being offline
+ ".*Call to node .* management API .* failed",
+ ]
+ )
+
+ # Stop pageserver so that reconcile cannot complete
+ env.pageserver.stop()
+
+ env.storage_controller.tenant_create(tenant_id, placement_policy="Detached")
+
+ # Try attaching it: we should see reconciles failing
+ env.storage_controller.tenant_policy_update(
+ tenant_id,
+ {
+ "placement": {"Attached": 0},
+ },
+ )
+
+ def reconcile_errors() -> int:
+ return int(
+ env.storage_controller.get_metric_value(
+ "storage_controller_reconcile_complete_total", filter={"status": "error"}
+ )
+ or 0
+ )
+
+ def reconcile_ok() -> int:
+ return int(
+ env.storage_controller.get_metric_value(
+ "storage_controller_reconcile_complete_total", filter={"status": "ok"}
+ )
+ or 0
+ )
+
+ def assert_errors_gt(n) -> int:
+ e = reconcile_errors()
+ assert e > n
+ return e
+
+ errs = wait_until(10, 1, lambda: assert_errors_gt(0))
+
+ # Try reconciling again, it should fail again
+ with pytest.raises(StorageControllerApiException):
+ env.storage_controller.reconcile_all()
+ errs = wait_until(10, 1, lambda: assert_errors_gt(errs))
+
+ # Configure the tenant to disable reconciles
+ env.storage_controller.tenant_policy_update(
+ tenant_id,
+ {
+ "scheduling": "Stop",
+ },
+ )
+
+ # Try reconciling again, it should not cause an error (silently skip)
+ env.storage_controller.reconcile_all()
+ assert reconcile_errors() == errs
+
+ # Start the pageserver and re-enable reconciles
+ env.pageserver.start()
+ env.storage_controller.tenant_policy_update(
+ tenant_id,
+ {
+ "scheduling": "Active",
+ },
+ )
+
+ def assert_ok_gt(n) -> int:
+ o = reconcile_ok()
+ assert o > n
+ return o
+
+ # We should see a successful reconciliation
+ wait_until(10, 1, lambda: assert_ok_gt(0))
+
+ # And indeed the tenant should be attached
+ assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1