diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index a4fbd80dc3..e765dfc2ae 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -27,7 +27,7 @@ pub(super) struct Reconciler { pub(super) tenant_shard_id: TenantShardId, pub(crate) shard: ShardIdentity, pub(crate) generation: Generation, - pub(crate) intent: IntentState, + pub(crate) intent: TargetState, pub(crate) config: TenantConfig, pub(crate) observed: ObservedState, @@ -62,6 +62,32 @@ pub(super) struct Reconciler { pub(crate) persistence: Arc, } +/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any +/// reference counting for Scheduler. The IntentState is what the scheduler works with, +/// and the TargetState is just the instruction for a particular Reconciler run. +#[derive(Debug)] +pub(crate) struct TargetState { + pub(crate) attached: Option, + pub(crate) secondary: Vec, +} + +impl TargetState { + pub(crate) fn from_intent(intent: &IntentState) -> Self { + Self { + attached: *intent.get_attached(), + secondary: intent.get_secondary().clone(), + } + } + + fn all_pageservers(&self) -> Vec { + let mut result = self.secondary.clone(); + if let Some(node_id) = &self.attached { + result.push(*node_id); + } + result + } +} + #[derive(thiserror::Error, Debug)] pub(crate) enum ReconcileError { #[error(transparent)] diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs index 3b4c9e3464..7a99118312 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/control_plane/attachment_service/src/scheduler.rs @@ -1,9 +1,7 @@ -use pageserver_api::shard::TenantShardId; -use std::collections::{BTreeMap, HashMap}; +use crate::node::Node; +use std::collections::HashMap; use utils::{http::error::ApiError, id::NodeId}; -use crate::{node::Node, tenant_state::TenantState}; - /// Scenarios in which we cannot find a suitable location for a tenant shard #[derive(thiserror::Error, Debug)] pub enum ScheduleError { @@ -19,52 +17,95 @@ impl From for ApiError { } } +struct SchedulerNode { + /// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`]. + shard_count: usize, + + /// Whether this node is currently elegible to have new shards scheduled (this is derived + /// from a node's availability state and scheduling policy). + may_schedule: bool, +} + pub(crate) struct Scheduler { - tenant_counts: HashMap, + nodes: HashMap, } impl Scheduler { - pub(crate) fn new( - tenants: &BTreeMap, - nodes: &HashMap, - ) -> Self { - let mut tenant_counts = HashMap::new(); - for node_id in nodes.keys() { - tenant_counts.insert(*node_id, 0); + pub(crate) fn new<'a>(nodes: impl Iterator) -> Self { + let mut scheduler_nodes = HashMap::new(); + for node in nodes { + scheduler_nodes.insert( + node.id, + SchedulerNode { + shard_count: 0, + may_schedule: node.may_schedule(), + }, + ); } - for tenant in tenants.values() { - if let Some(ps) = tenant.intent.attached { - let entry = tenant_counts.entry(ps).or_insert(0); - *entry += 1; + Self { + nodes: scheduler_nodes, + } + } + + /// Increment the reference count of a node. This reference count is used to guide scheduling + /// decisions, not for memory management: it represents one tenant shard whose IntentState targets + /// this node. + /// + /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into + /// [`Self::new`] or [`Self::node_upsert`]) + pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) { + let Some(node) = self.nodes.get_mut(&node_id) else { + tracing::error!("Scheduler missing node {node_id}"); + debug_assert!(false); + return; + }; + + node.shard_count += 1; + } + + /// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`]. + pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) { + let Some(node) = self.nodes.get_mut(&node_id) else { + debug_assert!(false); + tracing::error!("Scheduler missing node {node_id}"); + return; + }; + + node.shard_count -= 1; + } + + pub(crate) fn node_upsert(&mut self, node: &Node) { + use std::collections::hash_map::Entry::*; + match self.nodes.entry(node.id) { + Occupied(mut entry) => { + entry.get_mut().may_schedule = node.may_schedule(); + } + Vacant(entry) => { + entry.insert(SchedulerNode { + shard_count: 0, + may_schedule: node.may_schedule(), + }); } } - - for (node_id, node) in nodes { - if !node.may_schedule() { - tenant_counts.remove(node_id); - } - } - - Self { tenant_counts } } pub(crate) fn schedule_shard( &mut self, hard_exclude: &[NodeId], ) -> Result { - if self.tenant_counts.is_empty() { + if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); } let mut tenant_counts: Vec<(NodeId, usize)> = self - .tenant_counts + .nodes .iter() .filter_map(|(k, v)| { - if hard_exclude.contains(k) { + if hard_exclude.contains(k) || !v.may_schedule { None } else { - Some((*k, *v)) + Some((*k, v.shard_count)) } }) .collect(); @@ -73,7 +114,18 @@ impl Scheduler { tenant_counts.sort_by_key(|i| (i.1, i.0)); if tenant_counts.is_empty() { - // After applying constraints, no pageservers were left + // After applying constraints, no pageservers were left. We log some detail about + // the state of nodes to help understand why this happened. This is not logged as an error because + // it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard. + tracing::info!("Scheduling failure, while excluding {hard_exclude:?}, node states:"); + for (node_id, node) in &self.nodes { + tracing::info!( + "Node {node_id}: may_schedule={} shards={}", + node.may_schedule, + node.shard_count + ); + } + return Err(ScheduleError::ImpossibleConstraint); } @@ -82,7 +134,88 @@ impl Scheduler { "scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})", tenant_counts.iter().map(|i| i.0 .0).collect::>() ); - *self.tenant_counts.get_mut(&node_id).unwrap() += 1; + + // Note that we do not update shard count here to reflect the scheduling: that + // is IntentState's job when the scheduled location is used. + Ok(node_id) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; + use utils::id::NodeId; + + use crate::{node::Node, tenant_state::IntentState}; + + #[test] + fn scheduler_basic() -> anyhow::Result<()> { + let mut nodes = HashMap::new(); + nodes.insert( + NodeId(1), + Node { + id: NodeId(1), + availability: NodeAvailability::Active, + scheduling: NodeSchedulingPolicy::Active, + listen_http_addr: String::new(), + listen_http_port: 0, + listen_pg_addr: String::new(), + listen_pg_port: 0, + }, + ); + + nodes.insert( + NodeId(2), + Node { + id: NodeId(2), + availability: NodeAvailability::Active, + scheduling: NodeSchedulingPolicy::Active, + listen_http_addr: String::new(), + listen_http_port: 0, + listen_pg_addr: String::new(), + listen_pg_port: 0, + }, + ); + + let mut scheduler = Scheduler::new(nodes.values()); + let mut t1_intent = IntentState::new(); + let mut t2_intent = IntentState::new(); + + let scheduled = scheduler.schedule_shard(&[])?; + t1_intent.set_attached(&mut scheduler, Some(scheduled)); + let scheduled = scheduler.schedule_shard(&[])?; + t2_intent.set_attached(&mut scheduler, Some(scheduled)); + + assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1); + assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1); + + let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?; + t1_intent.push_secondary(&mut scheduler, scheduled); + + assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1); + assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2); + + t1_intent.clear(&mut scheduler); + assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0); + assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1); + + if cfg!(debug_assertions) { + // Dropping an IntentState without clearing it causes a panic in debug mode, + // because we have failed to properly update scheduler shard counts. + let result = std::panic::catch_unwind(move || { + drop(t2_intent); + }); + assert!(result.is_err()); + } else { + t2_intent.clear(&mut scheduler); + assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0); + assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0); + } + + Ok(()) + } +} diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 149cb7f2ba..097b4a1a47 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -69,6 +69,8 @@ struct ServiceState { nodes: Arc>, + scheduler: Scheduler, + compute_hook: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, @@ -80,14 +82,26 @@ impl ServiceState { result_tx: tokio::sync::mpsc::UnboundedSender, nodes: HashMap, tenants: BTreeMap, + scheduler: Scheduler, ) -> Self { Self { tenants, nodes: Arc::new(nodes), + scheduler, compute_hook: Arc::new(ComputeHook::new(config)), result_tx, } } + + fn parts_mut( + &mut self, + ) -> ( + &mut Arc>, + &mut BTreeMap, + &mut Scheduler, + ) { + (&mut self.nodes, &mut self.tenants, &mut self.scheduler) + } } #[derive(Clone)] @@ -234,19 +248,20 @@ impl Service { // Populate intent and observed states for all tenants, based on reported state on pageservers let (shard_count, nodes) = { let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); // Mark nodes online if they responded to us: nodes are offline by default after a restart. - let mut nodes = (*locked.nodes).clone(); - for (node_id, node) in nodes.iter_mut() { + let mut new_nodes = (**nodes).clone(); + for (node_id, node) in new_nodes.iter_mut() { if nodes_online.contains(node_id) { node.availability = NodeAvailability::Active; + scheduler.node_upsert(node); } } - locked.nodes = Arc::new(nodes); - let nodes = locked.nodes.clone(); + *nodes = Arc::new(new_nodes); for (tenant_shard_id, (node_id, observed_loc)) in observed { - let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else { + let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else { cleanup.push((tenant_shard_id, node_id)); continue; }; @@ -258,10 +273,9 @@ impl Service { } // Populate each tenant's intent state - let mut scheduler = Scheduler::new(&locked.tenants, &nodes); - for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() { + for (tenant_shard_id, tenant_state) in tenants.iter_mut() { tenant_state.intent_from_observed(); - if let Err(e) = tenant_state.schedule(&mut scheduler) { + if let Err(e) = tenant_state.schedule(scheduler) { // Non-fatal error: we are unable to properly schedule the tenant, perhaps because // not enough pageservers are available. The tenant may well still be available // to clients. @@ -276,7 +290,7 @@ impl Service { } } - (locked.tenants.len(), nodes) + (tenants.len(), nodes.clone()) }; // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that @@ -393,7 +407,56 @@ impl Service { } } - #[instrument(skip_all)] + /// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation + /// was successful, this will update the observed state of the tenant such that subsequent + /// calls to [`TenantState::maybe_reconcile`] will do nothing. + #[instrument(skip_all, fields( + tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(), + sequence=%result.sequence + ))] + fn process_result(&self, result: ReconcileResult) { + let mut locked = self.inner.write().unwrap(); + let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else { + // A reconciliation result might race with removing a tenant: drop results for + // tenants that aren't in our map. + return; + }; + + // Usually generation should only be updated via this path, so the max() isn't + // needed, but it is used to handle out-of-band updates via. e.g. test hook. + tenant.generation = std::cmp::max(tenant.generation, result.generation); + + // If the reconciler signals that it failed to notify compute, set this state on + // the shard so that a future [`TenantState::maybe_reconcile`] will try again. + tenant.pending_compute_notification = result.pending_compute_notification; + + match result.result { + Ok(()) => { + for (node_id, loc) in &result.observed.locations { + if let Some(conf) = &loc.conf { + tracing::info!("Updating observed location {}: {:?}", node_id, conf); + } else { + tracing::info!("Setting observed location {} to None", node_id,) + } + } + tenant.observed = result.observed; + tenant.waiter.advance(result.sequence); + } + Err(e) => { + tracing::warn!("Reconcile error: {}", e); + + // Ordering: populate last_error before advancing error_seq, + // so that waiters will see the correct error after waiting. + *(tenant.last_error.lock().unwrap()) = format!("{e}"); + tenant.error_waiter.advance(result.sequence); + + for (node_id, o) in result.observed.locations { + tenant.observed.locations.insert(node_id, o); + } + } + } + } + async fn process_results( &self, mut result_rx: tokio::sync::mpsc::UnboundedReceiver, @@ -412,55 +475,7 @@ impl Service { } }; - tracing::info!( - "Reconcile result for sequence {}, ok={}", - result.sequence, - result.result.is_ok() - ); - let mut locked = self.inner.write().unwrap(); - let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else { - // A reconciliation result might race with removing a tenant: drop results for - // tenants that aren't in our map. - continue; - }; - - // Usually generation should only be updated via this path, so the max() isn't - // needed, but it is used to handle out-of-band updates via. e.g. test hook. - tenant.generation = std::cmp::max(tenant.generation, result.generation); - - // If the reconciler signals that it failed to notify compute, set this state on - // the shard so that a future [`TenantState::maybe_reconcile`] will try again. - tenant.pending_compute_notification = result.pending_compute_notification; - - match result.result { - Ok(()) => { - for (node_id, loc) in &result.observed.locations { - if let Some(conf) = &loc.conf { - tracing::info!("Updating observed location {}: {:?}", node_id, conf); - } else { - tracing::info!("Setting observed location {} to None", node_id,) - } - } - tenant.observed = result.observed; - tenant.waiter.advance(result.sequence); - } - Err(e) => { - tracing::warn!( - "Reconcile error on tenant {}: {}", - tenant.tenant_shard_id, - e - ); - - // Ordering: populate last_error before advancing error_seq, - // so that waiters will see the correct error after waiting. - *(tenant.last_error.lock().unwrap()) = format!("{e}"); - tenant.error_waiter.advance(result.sequence); - - for (node_id, o) in result.observed.locations { - tenant.observed.locations.insert(node_id, o); - } - } - } + self.process_result(result); } } @@ -481,6 +496,32 @@ impl Service { let mut tenants = BTreeMap::new(); + let mut scheduler = Scheduler::new(nodes.values()); + + #[cfg(feature = "testing")] + { + // Hack: insert scheduler state for all nodes referenced by shards, as compatibility + // tests only store the shards, not the nodes. The nodes will be loaded shortly + // after when pageservers start up and register. + let mut node_ids = HashSet::new(); + for tsp in &tenant_shard_persistence { + node_ids.insert(tsp.generation_pageserver); + } + for node_id in node_ids { + tracing::info!("Creating node {} in scheduler for tests", node_id); + let node = Node { + id: NodeId(node_id as u64), + availability: NodeAvailability::Active, + scheduling: NodeSchedulingPolicy::Active, + listen_http_addr: "".to_string(), + listen_http_port: 123, + listen_pg_addr: "".to_string(), + listen_pg_port: 123, + }; + + scheduler.node_upsert(&node); + } + } for tsp in tenant_shard_persistence { let tenant_shard_id = TenantShardId { tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, @@ -501,7 +542,10 @@ impl Service { // it with what we can infer: the node for which a generation was most recently issued. let mut intent = IntentState::new(); if tsp.generation_pageserver != i64::MAX { - intent.attached = Some(NodeId(tsp.generation_pageserver as u64)) + intent.set_attached( + &mut scheduler, + Some(NodeId(tsp.generation_pageserver as u64)), + ); } let new_tenant = TenantState { @@ -532,6 +576,7 @@ impl Service { result_tx, nodes, tenants, + scheduler, ))), config, persistence, @@ -636,8 +681,9 @@ impl Service { }; let mut locked = self.inner.write().unwrap(); - let tenant_state = locked - .tenants + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + let tenant_state = tenants .get_mut(&attach_req.tenant_shard_id) .expect("Checked for existence above"); @@ -657,7 +703,7 @@ impl Service { generation = ?tenant_state.generation, "issuing", ); - } else if let Some(ps_id) = tenant_state.intent.attached { + } else if let Some(ps_id) = tenant_state.intent.get_attached() { tracing::info!( tenant_id = %attach_req.tenant_shard_id, %ps_id, @@ -669,7 +715,9 @@ impl Service { tenant_id = %attach_req.tenant_shard_id, "no-op: tenant already has no pageserver"); } - tenant_state.intent.attached = attach_req.node_id; + tenant_state + .intent + .set_attached(scheduler, attach_req.node_id); tracing::info!( "attach_hook: tenant {} set generation {:?}, pageserver {}", @@ -716,7 +764,7 @@ impl Service { InspectResponse { attachment: tenant_state.and_then(|s| { s.intent - .attached + .get_attached() .map(|ps| (s.generation.into().unwrap(), ps)) }), } @@ -862,16 +910,15 @@ impl Service { let (waiters, response_shards) = { let mut locked = self.inner.write().unwrap(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); let mut response_shards = Vec::new(); - let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); - for tenant_shard_id in create_ids { tracing::info!("Creating shard {tenant_shard_id}..."); use std::collections::btree_map::Entry; - match locked.tenants.entry(tenant_shard_id) { + match tenants.entry(tenant_shard_id) { Entry::Occupied(mut entry) => { tracing::info!( "Tenant shard {tenant_shard_id} already exists while creating" @@ -881,7 +928,7 @@ impl Service { // attached and secondary locations (independently) away frorm those // pageservers also holding a shard for this tenant. - entry.get_mut().schedule(&mut scheduler).map_err(|e| { + entry.get_mut().schedule(scheduler).map_err(|e| { ApiError::Conflict(format!( "Failed to schedule shard {tenant_shard_id}: {e}" )) @@ -892,7 +939,7 @@ impl Service { node_id: entry .get() .intent - .attached + .get_attached() .expect("We just set pageserver if it was None"), generation: entry.get().generation.into().unwrap(), }); @@ -914,7 +961,7 @@ impl Service { } state.config = create_req.config.clone(); - state.schedule(&mut scheduler).map_err(|e| { + state.schedule(scheduler).map_err(|e| { ApiError::Conflict(format!( "Failed to schedule shard {tenant_shard_id}: {e}" )) @@ -924,7 +971,7 @@ impl Service { shard_id: tenant_shard_id, node_id: state .intent - .attached + .get_attached() .expect("We just set pageserver if it was None"), generation: state.generation.into().unwrap(), }); @@ -1002,16 +1049,11 @@ impl Service { let mut locked = self.inner.write().unwrap(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); - let pageservers = locked.nodes.clone(); - - let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); + let (nodes, tenants, scheduler) = locked.parts_mut(); // Maybe we have existing shards let mut create = true; - for (shard_id, shard) in locked - .tenants - .range_mut(TenantShardId::tenant_range(tenant_id)) - { + for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { // Saw an existing shard: this is not a creation create = false; @@ -1035,7 +1077,7 @@ impl Service { | LocationConfigMode::AttachedSingle | LocationConfigMode::AttachedStale => { // TODO: persistence for changes in policy - if pageservers.len() > 1 { + if nodes.len() > 1 { shard.policy = PlacementPolicy::Double(1) } else { // Convenience for dev/test: if we just have one pageserver, import @@ -1045,11 +1087,11 @@ impl Service { } } - shard.schedule(&mut scheduler)?; + shard.schedule(scheduler)?; let maybe_waiter = shard.maybe_reconcile( result_tx.clone(), - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, @@ -1060,10 +1102,10 @@ impl Service { waiters.push(waiter); } - if let Some(node_id) = shard.intent.attached { + if let Some(node_id) = shard.intent.get_attached() { result.shards.push(TenantShardLocation { shard_id: *shard_id, - node_id, + node_id: *node_id, }) } } @@ -1154,7 +1196,7 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard.intent.attached.ok_or_else(|| { + let node_id = shard.intent.get_attached().ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) })?; let node = locked @@ -1211,9 +1253,16 @@ impl Service { // Drop in-memory state { let mut locked = self.inner.write().unwrap(); - locked - .tenants - .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + // Dereference Scheduler from shards before dropping them + for (_tenant_shard_id, shard) in + tenants.range_mut(TenantShardId::tenant_range(tenant_id)) + { + shard.intent.clear(scheduler); + } + + tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id); tracing::info!( "Deleted tenant {tenant_id}, now have {} tenants", locked.tenants.len() @@ -1248,7 +1297,7 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard.intent.attached.ok_or_else(|| { + let node_id = shard.intent.get_attached().ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) })?; let node = locked @@ -1329,7 +1378,7 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard.intent.attached.ok_or_else(|| { + let node_id = shard.intent.get_attached().ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) })?; let node = locked @@ -1401,13 +1450,13 @@ impl Service { // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might // point to somewhere we haven't attached yet. - let Some(node_id) = shard.intent.attached else { + let Some(node_id) = shard.intent.get_attached() else { return Err(ApiError::Conflict( "Cannot call timeline API on non-attached tenant".to_string(), )); }; - let Some(node) = locked.nodes.get(&node_id) else { + let Some(node) = locked.nodes.get(node_id) else { // This should never happen return Err(ApiError::InternalServerError(anyhow::anyhow!( "Shard refers to nonexistent node" @@ -1432,12 +1481,13 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard - .intent - .attached - .ok_or(ApiError::BadRequest(anyhow::anyhow!( - "Cannot locate a tenant that is not attached" - )))?; + let node_id = + shard + .intent + .get_attached() + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "Cannot locate a tenant that is not attached" + )))?; let node = pageservers .get(&node_id) @@ -1510,106 +1560,104 @@ impl Service { } // Validate input, and calculate which shards we will create - let (old_shard_count, targets, compute_hook) = { - let locked = self.inner.read().unwrap(); - - let pageservers = locked.nodes.clone(); - - let mut targets = Vec::new(); - - // In case this is a retry, count how many already-split shards we found - let mut children_found = Vec::new(); - let mut old_shard_count = None; - - for (tenant_shard_id, shard) in - locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + let (old_shard_count, targets, compute_hook) = { - match shard.shard.count.count().cmp(&split_req.new_shard_count) { - Ordering::Equal => { - // Already split this - children_found.push(*tenant_shard_id); - continue; - } - Ordering::Greater => { - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Requested count {} but already have shards at count {}", - split_req.new_shard_count, - shard.shard.count.count() - ))); - } - Ordering::Less => { - // Fall through: this shard has lower count than requested, - // is a candidate for splitting. - } - } + let locked = self.inner.read().unwrap(); - match old_shard_count { - None => old_shard_count = Some(shard.shard.count), - Some(old_shard_count) => { - if old_shard_count != shard.shard.count { - // We may hit this case if a caller asked for two splits to - // different sizes, before the first one is complete. - // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture - // of shard_count=1 and shard_count=2 shards in the map. - return Err(ApiError::Conflict( - "Cannot split, currently mid-split".to_string(), - )); + let pageservers = locked.nodes.clone(); + + let mut targets = Vec::new(); + + // In case this is a retry, count how many already-split shards we found + let mut children_found = Vec::new(); + let mut old_shard_count = None; + + for (tenant_shard_id, shard) in + locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + match shard.shard.count.count().cmp(&split_req.new_shard_count) { + Ordering::Equal => { + // Already split this + children_found.push(*tenant_shard_id); + continue; + } + Ordering::Greater => { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Requested count {} but already have shards at count {}", + split_req.new_shard_count, + shard.shard.count.count() + ))); + } + Ordering::Less => { + // Fall through: this shard has lower count than requested, + // is a candidate for splitting. } } - } - if policy.is_none() { - policy = Some(shard.policy.clone()); - } - if shard_ident.is_none() { - shard_ident = Some(shard.shard); - } - if tenant_shard_id.shard_count.count() == split_req.new_shard_count { - tracing::info!( - "Tenant shard {} already has shard count {}", - tenant_shard_id, - split_req.new_shard_count - ); - continue; - } + match old_shard_count { + None => old_shard_count = Some(shard.shard.count), + Some(old_shard_count) => { + if old_shard_count != shard.shard.count { + // We may hit this case if a caller asked for two splits to + // different sizes, before the first one is complete. + // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture + // of shard_count=1 and shard_count=2 shards in the map. + return Err(ApiError::Conflict( + "Cannot split, currently mid-split".to_string(), + )); + } + } + } + if policy.is_none() { + policy = Some(shard.policy.clone()); + } + if shard_ident.is_none() { + shard_ident = Some(shard.shard); + } - let node_id = - shard - .intent - .attached - .ok_or(ApiError::BadRequest(anyhow::anyhow!( - "Cannot split a tenant that is not attached" - )))?; + if tenant_shard_id.shard_count.count() == split_req.new_shard_count { + tracing::info!( + "Tenant shard {} already has shard count {}", + tenant_shard_id, + split_req.new_shard_count + ); + continue; + } - let node = pageservers - .get(&node_id) - .expect("Pageservers may not be deleted while referenced"); + let node_id = shard.intent.get_attached().ok_or(ApiError::BadRequest( + anyhow::anyhow!("Cannot split a tenant that is not attached"), + ))?; - // TODO: if any reconciliation is currently in progress for this shard, wait for it. + let node = pageservers + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); - targets.push(SplitTarget { - parent_id: *tenant_shard_id, - node: node.clone(), - child_ids: tenant_shard_id.split(ShardCount::new(split_req.new_shard_count)), - }); - } + // TODO: if any reconciliation is currently in progress for this shard, wait for it. - if targets.is_empty() { - if children_found.len() == split_req.new_shard_count as usize { - return Ok(TenantShardSplitResponse { - new_shards: children_found, + targets.push(SplitTarget { + parent_id: *tenant_shard_id, + node: node.clone(), + child_ids: tenant_shard_id + .split(ShardCount::new(split_req.new_shard_count)), }); - } else { - // No shards found to split, and no existing children found: the - // tenant doesn't exist at all. - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant {} not found", tenant_id).into(), - )); } - } - (old_shard_count, targets, locked.compute_hook.clone()) - }; + if targets.is_empty() { + if children_found.len() == split_req.new_shard_count as usize { + return Ok(TenantShardSplitResponse { + new_shards: children_found, + }); + } else { + // No shards found to split, and no existing children found: the + // tenant doesn't exist at all. + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {} not found", tenant_id).into(), + )); + } + } + + (old_shard_count, targets, locked.compute_hook.clone()) + }; // unwrap safety: we would have returned above if we didn't find at least one shard to split let old_shard_count = old_shard_count.unwrap(); @@ -1751,6 +1799,7 @@ impl Service { let mut child_locations = Vec::new(); { let mut locked = self.inner.write().unwrap(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); for target in targets { let SplitTarget { parent_id, @@ -1758,19 +1807,14 @@ impl Service { child_ids, } = target; let (pageserver, generation, config) = { - let old_state = locked - .tenants + let mut old_state = tenants .remove(&parent_id) .expect("It was present, we just split it"); - ( - old_state.intent.attached.unwrap(), - old_state.generation, - old_state.config.clone(), - ) + let old_attached = old_state.intent.get_attached().unwrap(); + old_state.intent.clear(scheduler); + (old_attached, old_state.generation, old_state.config.clone()) }; - locked.tenants.remove(&parent_id); - for child in child_ids { let mut child_shard = shard_ident; child_shard.number = child.shard_number; @@ -1785,7 +1829,7 @@ impl Service { ); let mut child_state = TenantState::new(child, child_shard, policy.clone()); - child_state.intent = IntentState::single(Some(pageserver)); + child_state.intent = IntentState::single(scheduler, Some(pageserver)); child_state.observed = ObservedState { locations: child_observed, }; @@ -1798,7 +1842,7 @@ impl Service { child_locations.push((child, pageserver)); - locked.tenants.insert(child, child_state); + tenants.insert(child, child_state); response.new_shards.push(child); } } @@ -1834,35 +1878,34 @@ impl Service { ) -> Result { let waiter = { let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let pageservers = locked.nodes.clone(); let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant shard not found").into(), )); }; - if shard.intent.attached == Some(migrate_req.node_id) { + if shard.intent.get_attached() == &Some(migrate_req.node_id) { // No-op case: we will still proceed to wait for reconciliation in case it is // incomplete from an earlier update to the intent. tracing::info!("Migrating: intent is unchanged {:?}", shard.intent); } else { - let old_attached = shard.intent.attached; + let old_attached = *shard.intent.get_attached(); match shard.policy { PlacementPolicy::Single => { - shard.intent.secondary.clear(); + shard.intent.clear_secondary(scheduler); } PlacementPolicy::Double(_n) => { // If our new attached node was a secondary, it no longer should be. - shard.intent.secondary.retain(|s| s != &migrate_req.node_id); + shard.intent.remove_secondary(scheduler, migrate_req.node_id); // If we were already attached to something, demote that to a secondary if let Some(old_attached) = old_attached { - shard.intent.secondary.push(old_attached); + shard.intent.push_secondary(scheduler, old_attached); } } PlacementPolicy::Detached => { @@ -1871,7 +1914,9 @@ impl Service { ))) } } - shard.intent.attached = Some(migrate_req.node_id); + shard + .intent + .set_attached(scheduler, Some(migrate_req.node_id)); tracing::info!("Migrating: new intent {:?}", shard.intent); shard.sequence = shard.sequence.next(); @@ -1879,7 +1924,7 @@ impl Service { shard.maybe_reconcile( result_tx, - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, @@ -1903,13 +1948,16 @@ impl Service { self.persistence.delete_tenant(tenant_id).await?; let mut locked = self.inner.write().unwrap(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); let mut shards = Vec::new(); - for (tenant_shard_id, _) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { + for (tenant_shard_id, _) in tenants.range(TenantShardId::tenant_range(tenant_id)) { shards.push(*tenant_shard_id); } - for shard in shards { - locked.tenants.remove(&shard); + for shard_id in shards { + if let Some(mut shard) = tenants.remove(&shard_id) { + shard.intent.clear(scheduler); + } } Ok(()) @@ -2004,6 +2052,7 @@ impl Service { let mut locked = self.inner.write().unwrap(); let mut new_nodes = (*locked.nodes).clone(); + locked.scheduler.node_upsert(&new_node); new_nodes.insert(register_req.node_id, new_node); locked.nodes = Arc::new(new_nodes); @@ -2020,8 +2069,9 @@ impl Service { let mut locked = self.inner.write().unwrap(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - let mut new_nodes = (*locked.nodes).clone(); + let mut new_nodes = (**nodes).clone(); let Some(node) = new_nodes.get_mut(&config_req.node_id) else { return Err(ApiError::NotFound( @@ -2057,11 +2107,14 @@ impl Service { // to wake up and start working. } + // Update the scheduler, in case the elegibility of the node for new shards has changed + scheduler.node_upsert(node); + let new_nodes = Arc::new(new_nodes); - let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes); if offline_transition { - for (tenant_shard_id, tenant_state) in &mut locked.tenants { + let mut tenants_affected: usize = 0; + for (tenant_shard_id, tenant_state) in tenants { if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&config_req.node_id) { @@ -2072,7 +2125,7 @@ impl Service { if tenant_state.intent.notify_offline(config_req.node_id) { tenant_state.sequence = tenant_state.sequence.next(); - match tenant_state.schedule(&mut scheduler) { + match tenant_state.schedule(scheduler) { Err(e) => { // It is possible that some tenants will become unschedulable when too many pageservers // go offline: in this case there isn't much we can do other than make the issue observable. @@ -2080,19 +2133,29 @@ impl Service { tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id); } Ok(()) => { - tenant_state.maybe_reconcile( - result_tx.clone(), - &new_nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ); + if tenant_state + .maybe_reconcile( + result_tx.clone(), + &new_nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ) + .is_some() + { + tenants_affected += 1; + }; } } } } + tracing::info!( + "Launched {} reconciler tasks for tenants affected by node {} going offline", + tenants_affected, + config_req.node_id + ) } if active_transition { @@ -2135,18 +2198,14 @@ impl Service { let mut waiters = Vec::new(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); - let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); - let pageservers = locked.nodes.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - for (_tenant_shard_id, shard) in locked - .tenants - .range_mut(TenantShardId::tenant_range(tenant_id)) - { - shard.schedule(&mut scheduler)?; + for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { + shard.schedule(scheduler)?; if let Some(waiter) = shard.maybe_reconcile( result_tx.clone(), - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index dd753ece3d..1a68864091 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -19,7 +19,9 @@ use crate::{ compute_hook::ComputeHook, node::Node, persistence::{split_state::SplitState, Persistence}, - reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler}, + reconciler::{ + attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState, + }, scheduler::{ScheduleError, Scheduler}, service, PlacementPolicy, Sequence, }; @@ -88,8 +90,107 @@ pub(crate) struct TenantState { #[derive(Default, Clone, Debug)] pub(crate) struct IntentState { - pub(crate) attached: Option, - pub(crate) secondary: Vec, + attached: Option, + secondary: Vec, +} + +impl IntentState { + pub(crate) fn new() -> Self { + Self { + attached: None, + secondary: vec![], + } + } + pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option) -> Self { + if let Some(node_id) = node_id { + scheduler.node_inc_ref(node_id); + } + Self { + attached: node_id, + secondary: vec![], + } + } + + pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option) { + if self.attached != new_attached { + if let Some(old_attached) = self.attached.take() { + scheduler.node_dec_ref(old_attached); + } + if let Some(new_attached) = &new_attached { + scheduler.node_inc_ref(*new_attached); + } + self.attached = new_attached; + } + } + + pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { + debug_assert!(!self.secondary.contains(&new_secondary)); + scheduler.node_inc_ref(new_secondary); + self.secondary.push(new_secondary); + } + + /// It is legal to call this with a node that is not currently a secondary: that is a no-op + pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) { + let index = self.secondary.iter().position(|n| *n == node_id); + if let Some(index) = index { + scheduler.node_dec_ref(node_id); + self.secondary.remove(index); + } + } + + pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) { + for secondary in self.secondary.drain(..) { + scheduler.node_dec_ref(secondary); + } + } + + pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { + if let Some(old_attached) = self.attached.take() { + scheduler.node_dec_ref(old_attached); + } + + self.clear_secondary(scheduler); + } + + pub(crate) fn all_pageservers(&self) -> Vec { + let mut result = Vec::new(); + if let Some(p) = self.attached { + result.push(p) + } + + result.extend(self.secondary.iter().copied()); + + result + } + + pub(crate) fn get_attached(&self) -> &Option { + &self.attached + } + + pub(crate) fn get_secondary(&self) -> &Vec { + &self.secondary + } + + /// When a node goes offline, we update intents to avoid using it + /// as their attached pageserver. + /// + /// Returns true if a change was made + pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { + if self.attached == Some(node_id) { + self.attached = None; + self.secondary.push(node_id); + true + } else { + false + } + } +} + +impl Drop for IntentState { + fn drop(&mut self) { + // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler + debug_assert!(self.attached.is_none() && self.secondary.is_empty()); + } } #[derive(Default, Clone)] @@ -182,46 +283,6 @@ pub(crate) struct ReconcileResult { pub(crate) pending_compute_notification: bool, } -impl IntentState { - pub(crate) fn new() -> Self { - Self { - attached: None, - secondary: vec![], - } - } - pub(crate) fn all_pageservers(&self) -> Vec { - let mut result = Vec::new(); - if let Some(p) = self.attached { - result.push(p) - } - - result.extend(self.secondary.iter().copied()); - - result - } - - pub(crate) fn single(node_id: Option) -> Self { - Self { - attached: node_id, - secondary: vec![], - } - } - - /// When a node goes offline, we update intents to avoid using it - /// as their attached pageserver. - /// - /// Returns true if a change was made - pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { - if self.attached == Some(node_id) { - self.attached = None; - self.secondary.push(node_id); - true - } else { - false - } - } -} - impl ObservedState { pub(crate) fn new() -> Self { Self { @@ -315,12 +376,12 @@ impl TenantState { // Should have exactly one attached, and zero secondaries if self.intent.attached.is_none() { let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.attached = Some(node_id); + self.intent.set_attached(scheduler, Some(node_id)); used_pageservers.push(node_id); modified = true; } if !self.intent.secondary.is_empty() { - self.intent.secondary.clear(); + self.intent.clear_secondary(scheduler); modified = true; } } @@ -328,14 +389,14 @@ impl TenantState { // Should have exactly one attached, and N secondaries if self.intent.attached.is_none() { let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.attached = Some(node_id); + self.intent.set_attached(scheduler, Some(node_id)); used_pageservers.push(node_id); modified = true; } while self.intent.secondary.len() < secondary_count { let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.secondary.push(node_id); + self.intent.push_secondary(scheduler, node_id); used_pageservers.push(node_id); modified = true; } @@ -343,12 +404,12 @@ impl TenantState { Detached => { // Should have no attached or secondary pageservers if self.intent.attached.is_some() { - self.intent.attached = None; + self.intent.set_attached(scheduler, None); modified = true; } if !self.intent.secondary.is_empty() { - self.intent.secondary.clear(); + self.intent.clear_secondary(scheduler); modified = true; } } @@ -490,7 +551,7 @@ impl TenantState { tenant_shard_id: self.tenant_shard_id, shard: self.shard, generation: self.generation, - intent: self.intent.clone(), + intent: TargetState::from_intent(&self.intent), config: self.config.clone(), observed: self.observed.clone(), pageservers: pageservers.clone(),