From 55d73f461ca028f88e20ffe12437d04b42ab6b72 Mon Sep 17 00:00:00 2001 From: John Spray Date: Sat, 3 Feb 2024 19:53:17 +0000 Subject: [PATCH] control_plane/attachment_service: better Scheduler --- .../attachment_service/src/reconciler.rs | 28 ++- .../attachment_service/src/scheduler.rs | 97 +++++++---- .../attachment_service/src/service.rs | 159 +++++++++++------- .../attachment_service/src/tenant_state.rs | 144 +++++++++++----- 4 files changed, 289 insertions(+), 139 deletions(-) diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index 776e1f9d1e..b93f82f0c0 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -26,7 +26,7 @@ pub(super) struct Reconciler { pub(super) tenant_shard_id: TenantShardId, pub(crate) shard: ShardIdentity, pub(crate) generation: Generation, - pub(crate) intent: IntentState, + pub(crate) intent: TargetState, pub(crate) config: TenantConfig, pub(crate) observed: ObservedState, @@ -57,6 +57,32 @@ pub(super) struct Reconciler { pub(crate) persistence: Arc, } +/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any +/// reference counting for Scheduler. The IntentState is what the scheduler works with, +/// and the TargetState is just the instruction for a particular Reconciler run. +#[derive(Debug)] +pub(crate) struct TargetState { + pub(crate) attached: Option, + pub(crate) secondary: Vec, +} + +impl TargetState { + pub(crate) fn from_intent(intent: &IntentState) -> Self { + Self { + attached: *intent.get_attached(), + secondary: intent.get_secondary().clone(), + } + } + + fn all_pageservers(&self) -> Vec { + let mut result = self.secondary.clone(); + if let Some(node_id) = &self.attached { + result.push(*node_id); + } + result + } +} + #[derive(thiserror::Error, Debug)] pub(crate) enum ReconcileError { #[error(transparent)] diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs index 1966a7ea2a..7d9a6951db 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/control_plane/attachment_service/src/scheduler.rs @@ -1,9 +1,7 @@ -use pageserver_api::shard::TenantShardId; -use std::collections::{BTreeMap, HashMap}; +use crate::node::Node; +use std::collections::HashMap; use utils::{http::error::ApiError, id::NodeId}; -use crate::{node::Node, tenant_state::TenantState}; - /// Scenarios in which we cannot find a suitable location for a tenant shard #[derive(thiserror::Error, Debug)] pub enum ScheduleError { @@ -19,52 +17,88 @@ impl From for ApiError { } } +struct SchedulerNode { + /// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`]. + shard_count: usize, + + /// Whether this node is currently elegible to have new shards scheduled (this is derived + /// from a node's availability state and scheduling policy). + may_schedule: bool, +} + pub(crate) struct Scheduler { - tenant_counts: HashMap, + nodes: HashMap, } impl Scheduler { - pub(crate) fn new( - tenants: &BTreeMap, - nodes: &HashMap, - ) -> Self { - let mut tenant_counts = HashMap::new(); - for node_id in nodes.keys() { - tenant_counts.insert(*node_id, 0); - } - - for tenant in tenants.values() { - if let Some(ps) = tenant.intent.attached { - let entry = tenant_counts.entry(ps).or_insert(0); - *entry += 1; - } - } - + pub(crate) fn new(nodes: &HashMap) -> Self { + let mut scheduler_nodes = HashMap::new(); for (node_id, node) in nodes { - if !node.may_schedule() { - tenant_counts.remove(node_id); - } + scheduler_nodes.insert( + *node_id, + SchedulerNode { + shard_count: 0, + may_schedule: node.may_schedule(), + }, + ); } - Self { tenant_counts } + Self { + nodes: scheduler_nodes, + } + } + + pub(crate) fn node_ref(&mut self, node_id: NodeId) { + let Some(node) = self.nodes.get_mut(&node_id) else { + debug_assert!(false); + tracing::error!("Scheduler missing node {node_id}"); + return; + }; + + node.shard_count += 1; + } + + pub(crate) fn node_deref(&mut self, node_id: NodeId) { + let Some(node) = self.nodes.get_mut(&node_id) else { + debug_assert!(false); + tracing::error!("Scheduler missing node {node_id}"); + return; + }; + + node.shard_count -= 1; + } + + pub(crate) fn node_upsert(&mut self, node_id: NodeId, may_schedule: bool) { + use std::collections::hash_map::Entry::*; + match self.nodes.entry(node_id) { + Occupied(mut entry) => { + entry.get_mut().may_schedule = may_schedule; + } + Vacant(entry) => { + entry.insert(SchedulerNode { + shard_count: 0, + may_schedule, + }); + } + } } pub(crate) fn schedule_shard( &mut self, hard_exclude: &[NodeId], ) -> Result { - if self.tenant_counts.is_empty() { + if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); } let mut tenant_counts: Vec<(NodeId, usize)> = self - .tenant_counts + .nodes .iter() .filter_map(|(k, v)| { - if hard_exclude.contains(k) { + if hard_exclude.contains(k) || !v.may_schedule { None } else { - Some((*k, *v)) + Some((*k, v.shard_count)) } }) .collect(); @@ -83,7 +117,10 @@ impl Scheduler { let node_id = tenant_counts.first().unwrap().0; tracing::info!("scheduler selected node {node_id}"); - *self.tenant_counts.get_mut(&node_id).unwrap() += 1; + + // Note that we do not update shard count here to reflect the scheduling: that + // is IntentState's job when the scheduled location is used. + Ok(node_id) } } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 6f0e3ebb74..213ce0b818 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -61,6 +61,8 @@ struct ServiceState { nodes: Arc>, + scheduler: Scheduler, + compute_hook: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, @@ -72,14 +74,26 @@ impl ServiceState { result_tx: tokio::sync::mpsc::UnboundedSender, nodes: HashMap, tenants: BTreeMap, + scheduler: Scheduler, ) -> Self { Self { tenants, nodes: Arc::new(nodes), + scheduler, compute_hook: Arc::new(ComputeHook::new(config)), result_tx, } } + + fn parts_mut( + &mut self, + ) -> ( + &mut Arc>, + &mut BTreeMap, + &mut Scheduler, + ) { + (&mut self.nodes, &mut self.tenants, &mut self.scheduler) + } } #[derive(Clone)] @@ -180,8 +194,10 @@ impl Service { // Populate intent and observed states for all tenants, based on reported state on pageservers let shard_count = { let mut locked = self.inner.write().unwrap(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + for (tenant_shard_id, (node_id, observed_loc)) in observed { - let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else { + let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else { cleanup.push((tenant_shard_id, node_id)); continue; }; @@ -193,10 +209,9 @@ impl Service { } // Populate each tenant's intent state - let mut scheduler = Scheduler::new(&locked.tenants, &nodes); - for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() { + for (tenant_shard_id, tenant_state) in tenants.iter_mut() { tenant_state.intent_from_observed(); - if let Err(e) = tenant_state.schedule(&mut scheduler) { + if let Err(e) = tenant_state.schedule(scheduler) { // Non-fatal error: we are unable to properly schedule the tenant, perhaps because // not enough pageservers are available. The tenant may well still be available // to clients. @@ -327,6 +342,8 @@ impl Service { let mut tenants = BTreeMap::new(); + let mut scheduler = Scheduler::new(&nodes); + for tsp in tenant_shard_persistence { let tenant_shard_id = TenantShardId { tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, @@ -347,7 +364,10 @@ impl Service { // it with what we can infer: the node for which a generation was most recently issued. let mut intent = IntentState::new(); if tsp.generation_pageserver != i64::MAX { - intent.attached = Some(NodeId(tsp.generation_pageserver as u64)) + intent.set_attached( + &mut scheduler, + Some(NodeId(tsp.generation_pageserver as u64)), + ); } let new_tenant = TenantState { @@ -377,6 +397,7 @@ impl Service { result_tx, nodes, tenants, + scheduler, ))), config, persistence, @@ -518,8 +539,9 @@ impl Service { }; let mut locked = self.inner.write().unwrap(); - let tenant_state = locked - .tenants + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + let tenant_state = tenants .get_mut(&attach_req.tenant_shard_id) .expect("Checked for existence above"); @@ -539,7 +561,7 @@ impl Service { generation = ?tenant_state.generation, "issuing", ); - } else if let Some(ps_id) = tenant_state.intent.attached { + } else if let Some(ps_id) = tenant_state.intent.get_attached() { tracing::info!( tenant_id = %attach_req.tenant_shard_id, %ps_id, @@ -551,7 +573,9 @@ impl Service { tenant_id = %attach_req.tenant_shard_id, "no-op: tenant already has no pageserver"); } - tenant_state.intent.attached = attach_req.node_id; + tenant_state + .intent + .set_attached(scheduler, attach_req.node_id); tracing::info!( "attach_hook: tenant {} set generation {:?}, pageserver {}", @@ -576,7 +600,7 @@ impl Service { InspectResponse { attachment: tenant_state.and_then(|s| { s.intent - .attached + .get_attached() .map(|ps| (s.generation.into().unwrap(), ps)) }), } @@ -728,16 +752,15 @@ impl Service { let (waiters, response_shards) = { let mut locked = self.inner.write().unwrap(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); let mut response_shards = Vec::new(); - let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); - for tenant_shard_id in create_ids { tracing::info!("Creating shard {tenant_shard_id}..."); use std::collections::btree_map::Entry; - match locked.tenants.entry(tenant_shard_id) { + match tenants.entry(tenant_shard_id) { Entry::Occupied(mut entry) => { tracing::info!( "Tenant shard {tenant_shard_id} already exists while creating" @@ -747,7 +770,7 @@ impl Service { // attached and secondary locations (independently) away frorm those // pageservers also holding a shard for this tenant. - entry.get_mut().schedule(&mut scheduler).map_err(|e| { + entry.get_mut().schedule(scheduler).map_err(|e| { ApiError::Conflict(format!( "Failed to schedule shard {tenant_shard_id}: {e}" )) @@ -758,7 +781,7 @@ impl Service { node_id: entry .get() .intent - .attached + .get_attached() .expect("We just set pageserver if it was None"), generation: entry.get().generation.into().unwrap(), }); @@ -780,7 +803,7 @@ impl Service { } state.config = create_req.config.clone(); - state.schedule(&mut scheduler).map_err(|e| { + state.schedule(scheduler).map_err(|e| { ApiError::Conflict(format!( "Failed to schedule shard {tenant_shard_id}: {e}" )) @@ -790,7 +813,7 @@ impl Service { shard_id: tenant_shard_id, node_id: state .intent - .attached + .get_attached() .expect("We just set pageserver if it was None"), generation: state.generation.into().unwrap(), }); @@ -866,16 +889,11 @@ impl Service { let mut locked = self.inner.write().unwrap(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); - let pageservers = locked.nodes.clone(); - - let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); + let (nodes, tenants, scheduler) = locked.parts_mut(); // Maybe we have existing shards let mut create = true; - for (shard_id, shard) in locked - .tenants - .range_mut(TenantShardId::tenant_range(tenant_id)) - { + for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { // Saw an existing shard: this is not a creation create = false; @@ -899,7 +917,7 @@ impl Service { | LocationConfigMode::AttachedSingle | LocationConfigMode::AttachedStale => { // TODO: persistence for changes in policy - if pageservers.len() > 1 { + if nodes.len() > 1 { shard.policy = PlacementPolicy::Double(1) } else { // Convenience for dev/test: if we just have one pageserver, import @@ -909,11 +927,11 @@ impl Service { } } - shard.schedule(&mut scheduler)?; + shard.schedule(scheduler)?; let maybe_waiter = shard.maybe_reconcile( result_tx.clone(), - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, @@ -922,10 +940,10 @@ impl Service { waiters.push(waiter); } - if let Some(node_id) = shard.intent.attached { + if let Some(node_id) = shard.intent.get_attached() { result.shards.push(TenantShardLocation { shard_id: *shard_id, - node_id, + node_id: *node_id, }) } } @@ -1002,7 +1020,7 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard.intent.attached.ok_or_else(|| { + let node_id = shard.intent.get_attached().ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) })?; let node = locked @@ -1061,9 +1079,16 @@ impl Service { // Drop in-memory state { let mut locked = self.inner.write().unwrap(); - locked - .tenants - .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + // Dereference Scheduler from shards before dropping them + for (_tenant_shard_id, shard) in + tenants.range_mut(TenantShardId::tenant_range(tenant_id)) + { + shard.intent.clear(scheduler); + } + + tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id); tracing::info!( "Deleted tenant {tenant_id}, now have {} tenants", locked.tenants.len() @@ -1097,7 +1122,7 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard.intent.attached.ok_or_else(|| { + let node_id = shard.intent.get_attached().ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) })?; let node = locked @@ -1177,7 +1202,7 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard.intent.attached.ok_or_else(|| { + let node_id = shard.intent.get_attached().ok_or_else(|| { ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) })?; let node = locked @@ -1249,13 +1274,13 @@ impl Service { // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might // point to somewhere we haven't attached yet. - let Some(node_id) = shard.intent.attached else { + let Some(node_id) = shard.intent.get_attached() else { return Err(ApiError::Conflict( "Cannot call timeline API on non-attached tenant".to_string(), )); }; - let Some(node) = locked.nodes.get(&node_id) else { + let Some(node) = locked.nodes.get(node_id) else { // This should never happen return Err(ApiError::InternalServerError(anyhow::anyhow!( "Shard refers to nonexistent node" @@ -1280,12 +1305,13 @@ impl Service { for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) { - let node_id = shard - .intent - .attached - .ok_or(ApiError::BadRequest(anyhow::anyhow!( - "Cannot locate a tenant that is not attached" - )))?; + let node_id = + shard + .intent + .get_attached() + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "Cannot locate a tenant that is not attached" + )))?; let node = pageservers .get(&node_id) @@ -1349,35 +1375,34 @@ impl Service { ) -> Result { let waiter = { let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let pageservers = locked.nodes.clone(); let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant shard not found").into(), )); }; - if shard.intent.attached == Some(migrate_req.node_id) { + if shard.intent.get_attached() == &Some(migrate_req.node_id) { // No-op case: we will still proceed to wait for reconciliation in case it is // incomplete from an earlier update to the intent. tracing::info!("Migrating: intent is unchanged {:?}", shard.intent); } else { - let old_attached = shard.intent.attached; + let old_attached = *shard.intent.get_attached(); match shard.policy { PlacementPolicy::Single => { - shard.intent.secondary.clear(); + shard.intent.clear_secondary(scheduler); } PlacementPolicy::Double(_n) => { // If our new attached node was a secondary, it no longer should be. - shard.intent.secondary.retain(|s| s != &migrate_req.node_id); + shard.intent.remove_secondary(scheduler, migrate_req.node_id); // If we were already attached to something, demote that to a secondary if let Some(old_attached) = old_attached { - shard.intent.secondary.push(old_attached); + shard.intent.push_secondary(scheduler, old_attached); } } PlacementPolicy::Detached => { @@ -1386,7 +1411,9 @@ impl Service { ))) } } - shard.intent.attached = Some(migrate_req.node_id); + shard + .intent + .set_attached(scheduler, Some(migrate_req.node_id)); tracing::info!("Migrating: new intent {:?}", shard.intent); shard.sequence = shard.sequence.next(); @@ -1394,7 +1421,7 @@ impl Service { shard.maybe_reconcile( result_tx, - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, @@ -1478,6 +1505,9 @@ impl Service { let mut locked = self.inner.write().unwrap(); let mut new_nodes = (*locked.nodes).clone(); + locked + .scheduler + .node_upsert(register_req.node_id, new_node.may_schedule()); new_nodes.insert(register_req.node_id, new_node); locked.nodes = Arc::new(new_nodes); @@ -1494,8 +1524,9 @@ impl Service { let mut locked = self.inner.write().unwrap(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - let mut new_nodes = (*locked.nodes).clone(); + let mut new_nodes = (**nodes).clone(); let Some(node) = new_nodes.get_mut(&config_req.node_id) else { return Err(ApiError::NotFound( @@ -1531,11 +1562,13 @@ impl Service { // to wake up and start working. } + // Update the scheduler, in case the elegibility of the node for new shards has changed + scheduler.node_upsert(node.id, node.may_schedule()); + let new_nodes = Arc::new(new_nodes); - let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes); if offline_transition { - for (tenant_shard_id, tenant_state) in &mut locked.tenants { + for (tenant_shard_id, tenant_state) in tenants { if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&config_req.node_id) { @@ -1546,7 +1579,7 @@ impl Service { if tenant_state.intent.notify_offline(config_req.node_id) { tenant_state.sequence = tenant_state.sequence.next(); - match tenant_state.schedule(&mut scheduler) { + match tenant_state.schedule(scheduler) { Err(e) => { // It is possible that some tenants will become unschedulable when too many pageservers // go offline: in this case there isn't much we can do other than make the issue observable. @@ -1605,18 +1638,14 @@ impl Service { let mut waiters = Vec::new(); let result_tx = locked.result_tx.clone(); let compute_hook = locked.compute_hook.clone(); - let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); - let pageservers = locked.nodes.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - for (_tenant_shard_id, shard) in locked - .tenants - .range_mut(TenantShardId::tenant_range(tenant_id)) - { - shard.schedule(&mut scheduler)?; + for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { + shard.schedule(scheduler)?; if let Some(waiter) = shard.maybe_reconcile( result_tx.clone(), - &pageservers, + nodes, &compute_hook, &self.config, &self.persistence, diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index a358e1ff7b..a5f7ca0e14 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -17,7 +17,9 @@ use crate::{ compute_hook::ComputeHook, node::Node, persistence::Persistence, - reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler}, + reconciler::{ + attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState, + }, scheduler::{ScheduleError, Scheduler}, service, PlacementPolicy, Sequence, }; @@ -81,8 +83,97 @@ pub(crate) struct TenantState { #[derive(Default, Clone, Debug)] pub(crate) struct IntentState { - pub(crate) attached: Option, - pub(crate) secondary: Vec, + attached: Option, + secondary: Vec, +} + +impl IntentState { + pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option) { + if self.attached != new_attached { + if let Some(old_attached) = self.attached.take() { + scheduler.node_deref(old_attached); + } + if let Some(new_attached) = &new_attached { + scheduler.node_ref(*new_attached); + } + self.attached = new_attached; + } + } + + pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { + debug_assert!(!self.secondary.contains(&new_secondary)); + scheduler.node_ref(new_secondary); + self.secondary.push(new_secondary); + } + + /// It is legal to call this with a node that is not currently a secondary: that is a no-op + pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) { + let index = self.secondary.iter().position(|n| *n == node_id); + if let Some(index) = index { + scheduler.node_deref(node_id); + self.secondary.remove(index); + } + } + + pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) { + for secondary in self.secondary.drain(..) { + scheduler.node_deref(secondary); + } + } + + pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { + if let Some(old_attached) = self.attached.take() { + scheduler.node_deref(old_attached); + } + + self.clear_secondary(scheduler); + } + + pub(crate) fn new() -> Self { + Self { + attached: None, + secondary: vec![], + } + } + pub(crate) fn all_pageservers(&self) -> Vec { + let mut result = Vec::new(); + if let Some(p) = self.attached { + result.push(p) + } + + result.extend(self.secondary.iter().copied()); + + result + } + + pub(crate) fn get_attached(&self) -> &Option { + &self.attached + } + + pub(crate) fn get_secondary(&self) -> &Vec { + &self.secondary + } + + /// When a node goes offline, we update intents to avoid using it + /// as their attached pageserver. + /// + /// Returns true if a change was made + pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { + if self.attached == Some(node_id) { + self.attached = None; + self.secondary.push(node_id); + true + } else { + false + } + } +} + +impl Drop for IntentState { + fn drop(&mut self) { + // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler + debug_assert!(self.attached.is_none() && self.secondary.is_empty()); + } } #[derive(Default, Clone)] @@ -175,39 +266,6 @@ pub(crate) struct ReconcileResult { pub(crate) pending_compute_notification: bool, } -impl IntentState { - pub(crate) fn new() -> Self { - Self { - attached: None, - secondary: vec![], - } - } - pub(crate) fn all_pageservers(&self) -> Vec { - let mut result = Vec::new(); - if let Some(p) = self.attached { - result.push(p) - } - - result.extend(self.secondary.iter().copied()); - - result - } - - /// When a node goes offline, we update intents to avoid using it - /// as their attached pageserver. - /// - /// Returns true if a change was made - pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { - if self.attached == Some(node_id) { - self.attached = None; - self.secondary.push(node_id); - true - } else { - false - } - } -} - impl ObservedState { pub(crate) fn new() -> Self { Self { @@ -297,12 +355,12 @@ impl TenantState { // Should have exactly one attached, and zero secondaries if self.intent.attached.is_none() { let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.attached = Some(node_id); + self.intent.set_attached(scheduler, Some(node_id)); used_pageservers.push(node_id); modified = true; } if !self.intent.secondary.is_empty() { - self.intent.secondary.clear(); + self.intent.clear_secondary(scheduler); modified = true; } } @@ -310,14 +368,14 @@ impl TenantState { // Should have exactly one attached, and N secondaries if self.intent.attached.is_none() { let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.attached = Some(node_id); + self.intent.set_attached(scheduler, Some(node_id)); used_pageservers.push(node_id); modified = true; } while self.intent.secondary.len() < secondary_count { let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.secondary.push(node_id); + self.intent.push_secondary(scheduler, node_id); used_pageservers.push(node_id); modified = true; } @@ -325,12 +383,12 @@ impl TenantState { Detached => { // Should have no attached or secondary pageservers if self.intent.attached.is_some() { - self.intent.attached = None; + self.intent.set_attached(scheduler, None); modified = true; } if !self.intent.secondary.is_empty() { - self.intent.secondary.clear(); + self.intent.clear_secondary(scheduler); modified = true; } } @@ -455,7 +513,7 @@ impl TenantState { tenant_shard_id: self.tenant_shard_id, shard: self.shard, generation: self.generation, - intent: self.intent.clone(), + intent: TargetState::from_intent(&self.intent), config: self.config.clone(), observed: self.observed.clone(), pageservers: pageservers.clone(),