From fd1368d31e2d159a518ac78dd6d5146a45a6de72 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 13 Jan 2025 19:33:00 +0000 Subject: [PATCH] storcon: rework scheduler optimisation, prioritize AZ (#9916) ## Problem We want to do a more robust job of scheduling tenants into their home AZ: https://github.com/neondatabase/neon/issues/8264. Closes: https://github.com/neondatabase/neon/issues/8969 ## Summary of changes ### Scope This PR combines prioritizing AZ with a larger rework of how we do optimisation. The rationale is that just bumping AZ in the order of Score attributes is a very tiny change: the interesting part is lining up all the optimisation logic to respect this properly, which means rewriting it to use the same scores as the scheduler, rather than the fragile hand-crafted logic that we had before. Separating these changes out is possible, but would involve doing two rounds of test updates instead of one. ### Scheduling optimisation `TenantShard`'s `optimize_attachment` and `optimize_secondary` methods now both use the scheduler to pick a new "favourite" location. Then there is some refined logic for whether + how to migrate to it: - To decide if a new location is sufficiently "better", we generate scores using some projected ScheduleContexts that exclude the shard under consideration, so that we avoid migrating from a node with AffinityScore(2) to a node with AffinityScore(1), only to migrate back later. - Score types get a `for_optimization` method so that when we compare scores, we will only do an optimisation if the scores differ by their highest-ranking attributes, not just because one pageserver is lower in utilization. Eventually we _will_ want a mode that does this, but doing it here would make scheduling logic unstable and harder to test, and to do this correctly one needs to know the size of the tenant that one is migrating. - When we find a new attached location that we would like to move to, we will create a new secondary location there, even if we already had one on some other node. This handles the case where we have a home AZ A, and want to migrate the attachment between pageservers in that AZ while retaining a secondary location in some other AZ as well. - A unit test is added for https://github.com/neondatabase/neon/issues/8969, which is implicitly fixed by reworking optimisation to use the same scheduling scores as scheduling. --- libs/pageserver_api/src/controller_api.rs | 10 + storage_controller/Cargo.toml | 2 +- storage_controller/src/drain_utils.rs | 2 +- storage_controller/src/reconciler.rs | 16 +- storage_controller/src/scheduler.rs | 599 ++++++++--- storage_controller/src/service.rs | 244 ++++- .../src/service/context_iterator.rs | 9 +- storage_controller/src/tenant_shard.rs | 978 +++++++++++++----- .../performance/test_sharding_autosplit.py | 21 +- .../test_storage_controller_scale.py | 115 +- test_runner/regress/test_sharding.py | 32 +- .../regress/test_storage_controller.py | 40 +- 12 files changed, 1598 insertions(+), 470 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index ba0eb0e4ae..f3aefc6df9 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -367,6 +367,16 @@ pub enum PlacementPolicy { Detached, } +impl PlacementPolicy { + pub fn want_secondaries(&self) -> usize { + match self { + PlacementPolicy::Attached(secondary_count) => *secondary_count, + PlacementPolicy::Secondary => 1, + PlacementPolicy::Detached => 0, + } + } +} + #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateResponse {} diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 5f3319512d..caaa22d0a5 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -55,4 +55,4 @@ r2d2 = { version = "0.8.10" } utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } control_plane = { path = "../control_plane" } -workspace_hack = { version = "0.1", path = "../workspace_hack" } +workspace_hack = { version = "0.1", path = "../workspace_hack" } \ No newline at end of file diff --git a/storage_controller/src/drain_utils.rs b/storage_controller/src/drain_utils.rs index 47f4276ff2..8b7be88078 100644 --- a/storage_controller/src/drain_utils.rs +++ b/storage_controller/src/drain_utils.rs @@ -112,7 +112,7 @@ impl TenantShardDrain { } } - match scheduler.node_preferred(tenant_shard.intent.get_secondary()) { + match tenant_shard.preferred_secondary(scheduler) { Some(node) => Some(node), None => { tracing::warn!( diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 6f584e7267..adced3b77d 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -826,7 +826,21 @@ impl Reconciler { if self.cancel.is_cancelled() { return Err(ReconcileError::Cancel); } - self.location_config(&node, conf, None, false).await?; + // We only try to configure secondary locations if the node is available. This does + // not stop us succeeding with the reconcile, because our core goal is to make the + // shard _available_ (the attached location), and configuring secondary locations + // can be done lazily when the node becomes available (via background reconciliation). + if node.is_available() { + self.location_config(&node, conf, None, false).await?; + } else { + // If the node is unavailable, we skip and consider the reconciliation successful: this + // is a common case where a pageserver is marked unavailable: we demote a location on + // that unavailable pageserver to secondary. + tracing::info!("Skipping configuring secondary location {node}, it is unavailable"); + self.observed + .locations + .insert(node.get_id(), ObservedStateLocation { conf: None }); + } } // The condition below identifies a detach. We must have no attached intent and diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 51a4cf35be..04a594dcac 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -32,6 +32,9 @@ pub(crate) struct SchedulerNode { shard_count: usize, /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. attached_shard_count: usize, + /// How many shards have a location on this node (via [`crate::tenant_shard::IntentState`]) _and_ this node + /// is in their preferred AZ (i.e. this is their 'home' location) + home_shard_count: usize, /// Availability zone id in which the node resides az: AvailabilityZone, @@ -47,6 +50,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { preferred_az: &Option, context: &ScheduleContext, ) -> Option; + + /// Return a score that drops any components based on node utilization: this is useful + /// for finding scores for scheduling optimisation, when we want to avoid rescheduling + /// shards due to e.g. disk usage, to avoid flapping. + fn for_optimization(&self) -> Self; + fn is_overloaded(&self) -> bool; fn node_id(&self) -> NodeId; } @@ -136,17 +145,13 @@ impl PartialOrd for SecondaryAzMatch { /// Ordering is given by member declaration order (top to bottom). #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub(crate) struct NodeAttachmentSchedulingScore { - /// The number of shards belonging to the tenant currently being - /// scheduled that are attached to this node. - affinity_score: AffinityScore, /// Flag indicating whether this node matches the preferred AZ /// of the shard. For equal affinity scores, nodes in the matching AZ /// are considered first. az_match: AttachmentAzMatch, - /// Size of [`ScheduleContext::attached_nodes`] for the current node. - /// This normally tracks the number of attached shards belonging to the - /// tenant being scheduled that are already on this node. - attached_shards_in_context: usize, + /// The number of shards belonging to the tenant currently being + /// scheduled that are attached to this node. + affinity_score: AffinityScore, /// Utilisation score that combines shard count and disk utilisation utilization_score: u64, /// Total number of shards attached to this node. When nodes have identical utilisation, this @@ -177,13 +182,25 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { .copied() .unwrap_or(AffinityScore::FREE), az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), - attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0), utilization_score: utilization.cached_score(), total_attached_shard_count: node.attached_shard_count, node_id: *node_id, }) } + /// For use in scheduling optimisation, where we only want to consider the aspects + /// of the score that can only be resolved by moving things (such as inter-shard affinity + /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which + /// can fluctuate for other reasons) + fn for_optimization(&self) -> Self { + Self { + utilization_score: 0, + total_attached_shard_count: 0, + node_id: NodeId(0), + ..*self + } + } + fn is_overloaded(&self) -> bool { PageserverUtilization::is_overloaded(self.utilization_score) } @@ -208,9 +225,9 @@ pub(crate) struct NodeSecondarySchedulingScore { affinity_score: AffinityScore, /// Utilisation score that combines shard count and disk utilisation utilization_score: u64, - /// Total number of shards attached to this node. When nodes have identical utilisation, this - /// acts as an anti-affinity between attached shards. - total_attached_shard_count: usize, + /// Anti-affinity with other non-home locations: this gives the behavior that secondaries + /// will spread out across the nodes in an AZ. + total_non_home_shard_count: usize, /// Convenience to make selection deterministic in tests and empty systems node_id: NodeId, } @@ -237,11 +254,20 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { .copied() .unwrap_or(AffinityScore::FREE), utilization_score: utilization.cached_score(), - total_attached_shard_count: node.attached_shard_count, + total_non_home_shard_count: (node.shard_count - node.home_shard_count), node_id: *node_id, }) } + fn for_optimization(&self) -> Self { + Self { + utilization_score: 0, + total_non_home_shard_count: 0, + node_id: NodeId(0), + ..*self + } + } + fn is_overloaded(&self) -> bool { PageserverUtilization::is_overloaded(self.utilization_score) } @@ -293,6 +319,10 @@ impl AffinityScore { pub(crate) fn inc(&mut self) { self.0 += 1; } + + pub(crate) fn dec(&mut self) { + self.0 -= 1; + } } impl std::ops::Add for AffinityScore { @@ -324,9 +354,6 @@ pub(crate) struct ScheduleContext { /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`] pub(crate) nodes: HashMap, - /// Specifically how many _attached_ locations are on each node - pub(crate) attached_nodes: HashMap, - pub(crate) mode: ScheduleMode, } @@ -334,7 +361,6 @@ impl ScheduleContext { pub(crate) fn new(mode: ScheduleMode) -> Self { Self { nodes: HashMap::new(), - attached_nodes: HashMap::new(), mode, } } @@ -348,25 +374,31 @@ impl ScheduleContext { } } - pub(crate) fn push_attached(&mut self, node_id: NodeId) { - let entry = self.attached_nodes.entry(node_id).or_default(); - *entry += 1; - } - - pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore { - self.nodes - .get(&node_id) - .copied() - .unwrap_or(AffinityScore::FREE) - } - - pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize { - self.attached_nodes.get(&node_id).copied().unwrap_or(0) + /// Remove `shard`'s contributions to this context. This is useful when considering scheduling + /// this shard afresh, where we don't want it to e.g. experience anti-affinity to its current location. + pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self { + let mut new_context = self.clone(); + + if let Some(attached) = shard.intent.get_attached() { + if let Some(score) = new_context.nodes.get_mut(attached) { + score.dec(); + } + } + + for secondary in shard.intent.get_secondary() { + if let Some(score) = new_context.nodes.get_mut(secondary) { + score.dec(); + } + } + + new_context } + /// For test, track the sum of AffinityScore values, which is effectively how many + /// attached or secondary locations have been registered with this context. #[cfg(test)] - pub(crate) fn attach_count(&self) -> usize { - self.attached_nodes.values().sum() + pub(crate) fn location_count(&self) -> usize { + self.nodes.values().map(|i| i.0).sum() } } @@ -388,6 +420,7 @@ impl Scheduler { SchedulerNode { shard_count: 0, attached_shard_count: 0, + home_shard_count: 0, may_schedule: node.may_schedule(), az: node.get_availability_zone_id().clone(), }, @@ -415,6 +448,7 @@ impl Scheduler { SchedulerNode { shard_count: 0, attached_shard_count: 0, + home_shard_count: 0, may_schedule: node.may_schedule(), az: node.get_availability_zone_id().clone(), }, @@ -427,6 +461,9 @@ impl Scheduler { Some(node) => { node.shard_count += 1; node.attached_shard_count += 1; + if Some(&node.az) == shard.preferred_az() { + node.home_shard_count += 1; + } } None => anyhow::bail!( "Tenant {} references nonexistent node {}", @@ -438,7 +475,12 @@ impl Scheduler { for node_id in shard.intent.get_secondary() { match expect_nodes.get_mut(node_id) { - Some(node) => node.shard_count += 1, + Some(node) => { + node.shard_count += 1; + if Some(&node.az) == shard.preferred_az() { + node.home_shard_count += 1; + } + } None => anyhow::bail!( "Tenant {} references nonexistent node {}", shard.tenant_shard_id, @@ -482,13 +524,20 @@ impl Scheduler { /// /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into /// [`Self::new`] or [`Self::node_upsert`]) - pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) { + pub(crate) fn update_node_ref_counts( + &mut self, + node_id: NodeId, + preferred_az: Option<&AvailabilityZone>, + update: RefCountUpdate, + ) { let Some(node) = self.nodes.get_mut(&node_id) else { debug_assert!(false); tracing::error!("Scheduler missing node {node_id}"); return; }; + let is_home_az = Some(&node.az) == preferred_az; + match update { RefCountUpdate::PromoteSecondary => { node.attached_shard_count += 1; @@ -496,19 +545,31 @@ impl Scheduler { RefCountUpdate::Attach => { node.shard_count += 1; node.attached_shard_count += 1; + if is_home_az { + node.home_shard_count += 1; + } } RefCountUpdate::Detach => { node.shard_count -= 1; node.attached_shard_count -= 1; + if is_home_az { + node.home_shard_count -= 1; + } } RefCountUpdate::DemoteAttached => { node.attached_shard_count -= 1; } RefCountUpdate::AddSecondary => { node.shard_count += 1; + if is_home_az { + node.home_shard_count += 1; + } } RefCountUpdate::RemoveSecondary => { node.shard_count -= 1; + if is_home_az { + node.home_shard_count -= 1; + } } } @@ -594,6 +655,7 @@ impl Scheduler { entry.insert(SchedulerNode { shard_count: 0, attached_shard_count: 0, + home_shard_count: 0, may_schedule: node.may_schedule(), az: node.get_availability_zone_id().clone(), }); @@ -607,33 +669,20 @@ impl Scheduler { } } - /// Where we have several nodes to choose from, for example when picking a secondary location - /// to promote to an attached location, this method may be used to pick the best choice based - /// on the scheduler's knowledge of utilization and availability. - /// - /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the - /// caller can pick a node some other way. - pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option { - if nodes.is_empty() { - return None; - } - - // TODO: When the utilization score returned by the pageserver becomes meaningful, - // schedule based on that instead of the shard count. - let node = nodes - .iter() - .map(|node_id| { - let may_schedule = self - .nodes - .get(node_id) - .map(|n| !matches!(n.may_schedule, MaySchedule::No)) - .unwrap_or(false); - (*node_id, may_schedule) - }) - .max_by_key(|(_n, may_schedule)| *may_schedule); - - // If even the preferred node has may_schedule==false, return None - node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None }) + /// Calculate a single node's score, used in optimizer logic to compare specific + /// nodes' scores. + pub(crate) fn compute_node_score( + &mut self, + node_id: NodeId, + preferred_az: &Option, + context: &ScheduleContext, + ) -> Option + where + Score: NodeSchedulingScore, + { + self.nodes + .get_mut(&node_id) + .and_then(|node| Score::generate(&node_id, node, preferred_az, context)) } /// Compute a schedulling score for each node that the scheduler knows of @@ -727,7 +776,7 @@ impl Scheduler { tracing::info!( "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})", scores.iter().map(|i| i.node_id().0).collect::>() - ); + ); } // Note that we do not update shard count here to reflect the scheduling: that @@ -743,47 +792,74 @@ impl Scheduler { } /// For choosing which AZ to schedule a new shard into, use this. It will return the - /// AZ with the lowest median utilization. + /// AZ with the the lowest number of shards currently scheduled in this AZ as their home + /// location. /// /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded /// node, because while tenants start out single sharded, when they grow and undergo - /// shard-split, they will occupy space on many nodes within an AZ. + /// shard-split, they will occupy space on many nodes within an AZ. It is important + /// that we pick the AZ in a way that balances this _future_ load. /// - /// We use median rather than total free space or mean utilization, because - /// we wish to avoid preferring AZs that have low-load nodes resulting from - /// recent replacements. - /// - /// The practical result is that we will pick an AZ based on its median node, and - /// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ. + /// Once we've picked an AZ, subsequent scheduling within that AZ will be driven by + /// nodes' utilization scores. pub(crate) fn get_az_for_new_tenant(&self) -> Option { if self.nodes.is_empty() { return None; } - let mut scores_by_az = HashMap::new(); - for (node_id, node) in &self.nodes { - let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new); - let score = match &node.may_schedule { - MaySchedule::Yes(utilization) => utilization.score(), - MaySchedule::No => PageserverUtilization::full().score(), - }; - az_scores.push((node_id, node, score)); + #[derive(Default)] + struct AzScore { + home_shard_count: usize, + scheduleable: bool, } - // Sort by utilization. Also include the node ID to break ties. - for scores in scores_by_az.values_mut() { - scores.sort_by_key(|i| (i.2, i.0)); + let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new(); + for node in self.nodes.values() { + let az = azs.entry(&node.az).or_default(); + az.home_shard_count += node.home_shard_count; + az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_)); } - let mut median_by_az = scores_by_az + // If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where + // all nodes are overloaded or otherwise unschedulable). + if azs.values().any(|i| i.scheduleable) { + azs.retain(|_, i| i.scheduleable); + } + + // Find the AZ with the lowest number of shards currently allocated + Some( + azs.into_iter() + .min_by_key(|i| (i.1.home_shard_count, i.0)) + .unwrap() + .0 + .clone(), + ) + } + + pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option { + self.nodes.get(node_id).map(|n| n.az.clone()) + } + + /// For use when choosing a preferred secondary location: filter out nodes that are not + /// available, and gather their AZs. + pub(crate) fn filter_usable_nodes( + &self, + nodes: &[NodeId], + ) -> Vec<(NodeId, Option)> { + nodes .iter() - .map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2)) - .collect::>(); - // Sort by utilization. Also include the AZ to break ties. - median_by_az.sort_by_key(|i| (i.1, i.0)); - - // Return the AZ with the lowest median utilization - Some(median_by_az.first().unwrap().0.clone()) + .filter_map(|node_id| { + let node = self + .nodes + .get(node_id) + .expect("Referenced nodes always exist"); + if matches!(node.may_schedule, MaySchedule::Yes(_)) { + Some((*node_id, Some(node.az.clone()))) + } else { + None + } + }) + .collect() } /// Unit test access to internal state @@ -843,7 +919,14 @@ pub(crate) mod test_utils { #[cfg(test)] mod tests { - use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; + use pageserver_api::{ + controller_api::NodeAvailability, models::utilization::test_utilization, + shard::ShardIdentity, + }; + use utils::{ + id::TenantId, + shard::{ShardCount, ShardNumber, TenantShardId}, + }; use super::*; @@ -853,8 +936,8 @@ mod tests { let nodes = test_utils::make_test_nodes(2, &[]); let mut scheduler = Scheduler::new(nodes.values()); - let mut t1_intent = IntentState::new(); - let mut t2_intent = IntentState::new(); + let mut t1_intent = IntentState::new(None); + let mut t2_intent = IntentState::new(None); let context = ScheduleContext::default(); @@ -930,7 +1013,7 @@ mod tests { let scheduled = scheduler .schedule_shard::(&[], &None, context) .unwrap(); - let mut intent = IntentState::new(); + let mut intent = IntentState::new(None); intent.set_attached(scheduler, Some(scheduled)); scheduled_intents.push(intent); assert_eq!(scheduled, expect_node); @@ -1063,7 +1146,7 @@ mod tests { let scheduled = scheduler .schedule_shard::(&[], &preferred_az, context) .unwrap(); - let mut intent = IntentState::new(); + let mut intent = IntentState::new(preferred_az.clone()); intent.set_attached(scheduler, Some(scheduled)); scheduled_intents.push(intent); assert_eq!(scheduled, expect_node); @@ -1089,9 +1172,9 @@ mod tests { &mut context, ); - // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that. + // Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id. assert_scheduler_chooses::( - NodeId(2), + NodeId(1), Some(az_a_tag.clone()), &mut scheduled_intents, &mut scheduler, @@ -1107,26 +1190,6 @@ mod tests { &mut context, ); - // Avoid nodes in "az-b" for the secondary location. - // Nodes 1 and 3 are identically loaded, so prefer the lowest node id. - assert_scheduler_chooses::( - NodeId(1), - Some(az_b_tag.clone()), - &mut scheduled_intents, - &mut scheduler, - &mut context, - ); - - // Avoid nodes in "az-b" for the secondary location. - // Node 3 has lower affinity score than 1, so prefer that. - assert_scheduler_chooses::( - NodeId(3), - Some(az_b_tag.clone()), - &mut scheduled_intents, - &mut scheduler, - &mut context, - ); - for mut intent in scheduled_intents { intent.clear(&mut scheduler); } @@ -1150,34 +1213,292 @@ mod tests { let mut scheduler = Scheduler::new(nodes.values()); - /// Force the utilization of a node in Scheduler's state to a particular - /// number of bytes used. - fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) { - let mut node = Node::new( - node_id, - "".to_string(), - 0, - "".to_string(), - 0, - scheduler.nodes.get(&node_id).unwrap().az.clone(), - ); - node.set_availability(NodeAvailability::Active(test_utilization::simple( - shard_count, - 0, - ))); - scheduler.node_upsert(&node); + /// Force the `home_shard_count` of a node directly: this is the metric used + /// by the scheduler when picking AZs. + fn set_shard_count(scheduler: &mut Scheduler, node_id: NodeId, shard_count: usize) { + let node = scheduler.nodes.get_mut(&node_id).unwrap(); + node.home_shard_count = shard_count; } // Initial empty state. Scores are tied, scheduler prefers lower AZ ID. assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); - // Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed - set_utilization(&mut scheduler, NodeId(1), 1000000); - assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); - - // Put some utilization on a second node in AZ A: now the median has changed, so the scheduler - // should prefer the other AZ. - set_utilization(&mut scheduler, NodeId(2), 1000000); + // Home shard count is higher in AZ A, so AZ B will be preferred + set_shard_count(&mut scheduler, NodeId(1), 10); assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone())); + + // Total home shard count is higher in AZ B, so we revert to preferring AZ A + set_shard_count(&mut scheduler, NodeId(4), 6); + set_shard_count(&mut scheduler, NodeId(5), 6); + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); + } + + /// Test that when selecting AZs for many new tenants, we get the expected balance across nodes + #[test] + fn az_selection_many() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let az_c_tag = AvailabilityZone("az-c".to_string()); + let nodes = test_utils::make_test_nodes( + 6, + &[ + az_a_tag.clone(), + az_b_tag.clone(), + az_c_tag.clone(), + az_a_tag.clone(), + az_b_tag.clone(), + az_c_tag.clone(), + ], + ); + + let mut scheduler = Scheduler::new(nodes.values()); + + // We should get 1/6th of these on each node, give or take a few... + let total_tenants = 300; + + // ...where the 'few' is the number of AZs, because the scheduling will sometimes overshoot + // on one AZ before correcting itself. This is because we select the 'home' AZ based on + // an AZ-wide metric, but we select the location for secondaries on a purely node-based + // metric (while excluding the home AZ). + let grace = 3; + + let mut scheduled_shards = Vec::new(); + for _i in 0..total_tenants { + let preferred_az = scheduler.get_az_for_new_tenant().unwrap(); + + let mut node_home_counts = scheduler + .nodes + .iter() + .map(|(node_id, node)| (node_id, node.home_shard_count)) + .collect::>(); + node_home_counts.sort_by_key(|i| i.0); + eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts); + + let tenant_shard_id = TenantShardId { + tenant_id: TenantId::generate(), + shard_number: ShardNumber(0), + shard_count: ShardCount(1), + }; + + let shard_identity = ShardIdentity::new( + tenant_shard_id.shard_number, + tenant_shard_id.shard_count, + pageserver_api::shard::ShardStripeSize(1), + ) + .unwrap(); + let mut shard = TenantShard::new( + tenant_shard_id, + shard_identity, + pageserver_api::controller_api::PlacementPolicy::Attached(1), + Some(preferred_az), + ); + + let mut context = ScheduleContext::default(); + shard.schedule(&mut scheduler, &mut context).unwrap(); + eprintln!("Scheduled shard at {:?}", shard.intent); + + scheduled_shards.push(shard); + } + + for (node_id, node) in &scheduler.nodes { + eprintln!( + "Node {}: {} {} {}", + node_id, node.shard_count, node.attached_shard_count, node.home_shard_count + ); + } + + for node in scheduler.nodes.values() { + assert!((node.home_shard_count as i64 - total_tenants as i64 / 6).abs() < grace); + } + + for mut shard in scheduled_shards { + shard.intent.clear(&mut scheduler); + } + } + + #[test] + /// Make sure that when we have an odd number of nodes and an even number of shards, we still + /// get scheduling stability. + fn odd_nodes_stability() { + let az_a = AvailabilityZone("az-a".to_string()); + let az_b = AvailabilityZone("az-b".to_string()); + + let nodes = test_utils::make_test_nodes( + 10, + &[ + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Need to keep these alive because they contribute to shard counts via RAII + let mut scheduled_shards = Vec::new(); + + let mut context = ScheduleContext::default(); + + fn schedule_shard( + tenant_shard_id: TenantShardId, + expect_attached: NodeId, + expect_secondary: NodeId, + scheduled_shards: &mut Vec, + scheduler: &mut Scheduler, + preferred_az: Option, + context: &mut ScheduleContext, + ) { + let shard_identity = ShardIdentity::new( + tenant_shard_id.shard_number, + tenant_shard_id.shard_count, + pageserver_api::shard::ShardStripeSize(1), + ) + .unwrap(); + let mut shard = TenantShard::new( + tenant_shard_id, + shard_identity, + pageserver_api::controller_api::PlacementPolicy::Attached(1), + preferred_az, + ); + + shard.schedule(scheduler, context).unwrap(); + + assert_eq!(shard.intent.get_attached().unwrap(), expect_attached); + assert_eq!( + shard.intent.get_secondary().first().unwrap(), + &expect_secondary + ); + + scheduled_shards.push(shard); + } + + let tenant_id = TenantId::generate(); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(0), + shard_count: ShardCount(8), + }, + NodeId(1), + NodeId(6), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(1), + shard_count: ShardCount(8), + }, + NodeId(2), + NodeId(7), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(2), + shard_count: ShardCount(8), + }, + NodeId(3), + NodeId(8), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(3), + shard_count: ShardCount(8), + }, + NodeId(4), + NodeId(9), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(4), + shard_count: ShardCount(8), + }, + NodeId(5), + NodeId(10), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(5), + shard_count: ShardCount(8), + }, + NodeId(1), + NodeId(6), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(6), + shard_count: ShardCount(8), + }, + NodeId(2), + NodeId(7), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(7), + shard_count: ShardCount(8), + }, + NodeId(3), + NodeId(8), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + // Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable. + for shard in &scheduled_shards { + assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None); + } + + for mut shard in scheduled_shards { + shard.intent.clear(&mut scheduler); + } } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 8aa263f0c3..dadcc44cfb 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1404,7 +1404,11 @@ impl Service { // We will populate intent properly later in [`Self::startup_reconcile`], initially populate // it with what we can infer: the node for which a generation was most recently issued. - let mut intent = IntentState::new(); + let mut intent = IntentState::new( + tsp.preferred_az_id + .as_ref() + .map(|az| AvailabilityZone(az.clone())), + ); if let Some(generation_pageserver) = tsp.generation_pageserver.map(|n| NodeId(n as u64)) { if nodes.contains_key(&generation_pageserver) { @@ -2474,18 +2478,29 @@ impl Service { tenant_id: TenantId, _guard: &TracingExclusiveGuard, ) -> Result<(), ApiError> { - let present_in_memory = { + // Check if the tenant is present in memory, and select an AZ to use when loading + // if we will load it. + let load_in_az = { let locked = self.inner.read().unwrap(); - locked + let existing = locked .tenants .range(TenantShardId::tenant_range(tenant_id)) - .next() - .is_some() - }; + .next(); - if present_in_memory { - return Ok(()); - } + // If the tenant is not present in memory, we expect to load it from database, + // so let's figure out what AZ to load it into while we have self.inner locked. + if existing.is_none() { + locked + .scheduler + .get_az_for_new_tenant() + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "No AZ with nodes found to load tenant" + )))? + } else { + // We already have this tenant in memory + return Ok(()); + } + }; let tenant_shards = self.persistence.load_tenant(tenant_id).await?; if tenant_shards.is_empty() { @@ -2494,8 +2509,20 @@ impl Service { )); } - // TODO: choose a fresh AZ to use for this tenant when un-detaching: there definitely isn't a running - // compute, so no benefit to making AZ sticky across detaches. + // Update the persistent shards with the AZ that we are about to apply to in-memory state + self.persistence + .set_tenant_shard_preferred_azs( + tenant_shards + .iter() + .map(|t| { + ( + t.get_tenant_shard_id().expect("Corrupt shard in database"), + load_in_az.clone(), + ) + }) + .collect(), + ) + .await?; let mut locked = self.inner.write().unwrap(); tracing::info!( @@ -2505,7 +2532,7 @@ impl Service { ); locked.tenants.extend(tenant_shards.into_iter().map(|p| { - let intent = IntentState::new(); + let intent = IntentState::new(Some(load_in_az.clone())); let shard = TenantShard::from_persistent(p, intent).expect("Corrupt shard row in database"); @@ -4236,6 +4263,22 @@ impl Service { } tracing::info!("Restoring parent shard {tenant_shard_id}"); + + // Drop any intents that refer to unavailable nodes, to enable this abort to proceed even + // if the original attachment location is offline. + if let Some(node_id) = shard.intent.get_attached() { + if !nodes.get(node_id).unwrap().is_available() { + tracing::info!("Demoting attached intent for {tenant_shard_id} on unavailable node {node_id}"); + shard.intent.demote_attached(scheduler, *node_id); + } + } + for node_id in shard.intent.get_secondary().clone() { + if !nodes.get(&node_id).unwrap().is_available() { + tracing::info!("Dropping secondary intent for {tenant_shard_id} on unavailable node {node_id}"); + shard.intent.remove_secondary(scheduler, node_id); + } + } + shard.splitting = SplitState::Idle; if let Err(e) = shard.schedule(scheduler, &mut ScheduleContext::default()) { // If this shard can't be scheduled now (perhaps due to offline nodes or @@ -4389,15 +4432,13 @@ impl Service { let mut child_state = TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone()); - child_state.intent = IntentState::single(scheduler, Some(pageserver)); + child_state.intent = + IntentState::single(scheduler, Some(pageserver), preferred_az.clone()); child_state.observed = ObservedState { locations: child_observed, }; child_state.generation = Some(generation); child_state.config = config.clone(); - if let Some(preferred_az) = &preferred_az { - child_state.set_preferred_az(preferred_az.clone()); - } // The child's TenantShard::splitting is intentionally left at the default value of Idle, // as at this point in the split process we have succeeded and this part is infallible: @@ -5014,6 +5055,8 @@ impl Service { // If our new attached node was a secondary, it no longer should be. shard.intent.remove_secondary(scheduler, migrate_req.node_id); + shard.intent.set_attached(scheduler, Some(migrate_req.node_id)); + // If we were already attached to something, demote that to a secondary if let Some(old_attached) = old_attached { if n > 0 { @@ -5025,8 +5068,6 @@ impl Service { shard.intent.push_secondary(scheduler, old_attached); } } - - shard.intent.set_attached(scheduler, Some(migrate_req.node_id)); } PlacementPolicy::Secondary => { shard.intent.clear(scheduler); @@ -5712,7 +5753,7 @@ impl Service { register_req.listen_http_port, register_req.listen_pg_addr, register_req.listen_pg_port, - register_req.availability_zone_id, + register_req.availability_zone_id.clone(), ); // TODO: idempotency if the node already exists in the database @@ -5732,8 +5773,9 @@ impl Service { .set(locked.nodes.len() as i64); tracing::info!( - "Registered pageserver {}, now have {} pageservers", + "Registered pageserver {} ({}), now have {} pageservers", register_req.node_id, + register_req.availability_zone_id, locked.nodes.len() ); Ok(()) @@ -6467,6 +6509,7 @@ impl Service { // Shard was dropped between planning and execution; continue; }; + tracing::info!("Applying optimization: {optimization:?}"); if shard.apply_optimization(scheduler, optimization) { optimizations_applied += 1; if self.maybe_reconcile_shard(shard, nodes).is_some() { @@ -6497,7 +6540,13 @@ impl Service { let mut work = Vec::new(); let mut locked = self.inner.write().unwrap(); - let (nodes, tenants, scheduler) = locked.parts_mut(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + // We are going to plan a bunch of optimisations before applying any of them, so the + // utilisation stats on nodes will be effectively stale for the >1st optimisation we + // generate. To avoid this causing unstable migrations/flapping, it's important that the + // code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`] + // to ignore the utilisation component of the score. for (_tenant_id, schedule_context, shards) in TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) @@ -6528,13 +6577,28 @@ impl Service { continue; } - // TODO: optimization calculations are relatively expensive: create some fast-path for - // the common idle case (avoiding the search on tenants that we have recently checked) + // Fast path: we may quickly identify shards that don't have any possible optimisations + if !shard.maybe_optimizable(scheduler, &schedule_context) { + if cfg!(feature = "testing") { + // Check that maybe_optimizable doesn't disagree with the actual optimization functions. + // Only do this in testing builds because it is not a correctness-critical check, so we shouldn't + // panic in prod if we hit this, or spend cycles on it in prod. + assert!(shard + .optimize_attachment(scheduler, &schedule_context) + .is_none()); + assert!(shard + .optimize_secondary(scheduler, &schedule_context) + .is_none()); + } + continue; + } + if let Some(optimization) = - // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to + // If idle, maybe optimize attachments: if a shard has a secondary location that is preferable to // its primary location based on soft constraints, cut it over. - shard.optimize_attachment(nodes, &schedule_context) + shard.optimize_attachment(scheduler, &schedule_context) { + tracing::info!(tenant_shard_id=%shard.tenant_shard_id, "Identified optimization for attachment: {optimization:?}"); work.push((shard.tenant_shard_id, optimization)); break; } else if let Some(optimization) = @@ -6544,6 +6608,7 @@ impl Service { // in the same tenant with secondary locations on the node where they originally split. shard.optimize_secondary(scheduler, &schedule_context) { + tracing::info!(tenant_shard_id=%shard.tenant_shard_id, "Identified optimization for secondary: {optimization:?}"); work.push((shard.tenant_shard_id, optimization)); break; } @@ -6592,8 +6657,10 @@ impl Service { } } } - ScheduleOptimizationAction::ReplaceSecondary(_) => { - // No extra checks needed to replace a secondary: this does not interrupt client access + ScheduleOptimizationAction::ReplaceSecondary(_) + | ScheduleOptimizationAction::CreateSecondary(_) + | ScheduleOptimizationAction::RemoveSecondary(_) => { + // No extra checks needed to manage secondaries: this does not interrupt client access validated_work.push((tenant_shard_id, optimization)) } }; @@ -6665,26 +6732,35 @@ impl Service { /// we have this helper to move things along faster. #[cfg(feature = "testing")] async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) { - let (attached_node, secondary_node) = { + let (attached_node, secondaries) = { let locked = self.inner.read().unwrap(); let Some(shard) = locked.tenants.get(&tenant_shard_id) else { + tracing::warn!( + "Skipping kick of secondary download for {tenant_shard_id}: not found" + ); return; }; - let (Some(attached), Some(secondary)) = ( - shard.intent.get_attached(), - shard.intent.get_secondary().first(), - ) else { + + let Some(attached) = shard.intent.get_attached() else { + tracing::warn!( + "Skipping kick of secondary download for {tenant_shard_id}: no attached" + ); return; }; - ( - locked.nodes.get(attached).unwrap().clone(), - locked.nodes.get(secondary).unwrap().clone(), - ) + + let secondaries = shard + .intent + .get_secondary() + .iter() + .map(|n| locked.nodes.get(n).unwrap().clone()) + .collect::>(); + + (locked.nodes.get(attached).unwrap().clone(), secondaries) }; // Make remote API calls to upload + download heatmaps: we ignore errors because this is just // a 'kick' to let scheduling optimisation run more promptly. - attached_node + match attached_node .with_client_retries( |client| async move { client.tenant_heatmap_upload(tenant_shard_id).await }, &self.config.jwt_token, @@ -6693,22 +6769,57 @@ impl Service { SHORT_RECONCILE_TIMEOUT, &self.cancel, ) - .await; + .await + { + Some(Err(e)) => { + tracing::info!( + "Failed to upload heatmap from {attached_node} for {tenant_shard_id}: {e}" + ); + } + None => { + tracing::info!( + "Cancelled while uploading heatmap from {attached_node} for {tenant_shard_id}" + ); + } + Some(Ok(_)) => { + tracing::info!( + "Successfully uploaded heatmap from {attached_node} for {tenant_shard_id}" + ); + } + } - secondary_node - .with_client_retries( - |client| async move { - client - .tenant_secondary_download(tenant_shard_id, Some(Duration::from_secs(1))) - .await - }, - &self.config.jwt_token, - 3, - 10, - SHORT_RECONCILE_TIMEOUT, - &self.cancel, - ) - .await; + for secondary_node in secondaries { + match secondary_node + .with_client_retries( + |client| async move { + client + .tenant_secondary_download( + tenant_shard_id, + Some(Duration::from_secs(1)), + ) + .await + }, + &self.config.jwt_token, + 3, + 10, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await + { + Some(Err(e)) => { + tracing::info!( + "Failed to download heatmap from {secondary_node} for {tenant_shard_id}: {e}" + ); + } + None => { + tracing::info!("Cancelled while downloading heatmap from {secondary_node} for {tenant_shard_id}"); + } + Some(Ok(progress)) => { + tracing::info!("Successfully downloaded heatmap from {secondary_node} for {tenant_shard_id}: {progress:?}"); + } + } + } } /// Look for shards which are oversized and in need of splitting @@ -7144,9 +7255,15 @@ impl Service { fn fill_node_plan(&self, node_id: NodeId) -> Vec { let mut locked = self.inner.write().unwrap(); let fill_requirement = locked.scheduler.compute_fill_requirement(node_id); + let (nodes, tenants, _scheduler) = locked.parts_mut(); - let mut tids_by_node = locked - .tenants + let node_az = nodes + .get(&node_id) + .expect("Node must exist") + .get_availability_zone_id() + .clone(); + + let mut tids_by_node = tenants .iter_mut() .filter_map(|(tid, tenant_shard)| { if !matches!( @@ -7159,6 +7276,25 @@ impl Service { return None; } + // AZ check: when filling nodes after a restart, our intent is to move _back_ the + // shards which belong on this node, not to promote shards whose scheduling preference + // would be on their currently attached node. So will avoid promoting shards whose + // home AZ doesn't match the AZ of the node we're filling. + match tenant_shard.preferred_az() { + None => { + // Shard doesn't have an AZ preference: it is elegible to be moved. + } + Some(az) if az == &node_az => { + // This shard's home AZ is equal to the node we're filling: it is + // elegible to be moved: fall through; + } + Some(_) => { + // This shard's home AZ is somewhere other than the node we're filling: + // do not include it in the fill plan. + return None; + } + } + if tenant_shard.intent.get_secondary().contains(&node_id) { if let Some(primary) = tenant_shard.intent.get_attached() { return Some((*primary, *tid)); diff --git a/storage_controller/src/service/context_iterator.rs b/storage_controller/src/service/context_iterator.rs index d38010a27e..dd6913e988 100644 --- a/storage_controller/src/service/context_iterator.rs +++ b/storage_controller/src/service/context_iterator.rs @@ -43,9 +43,6 @@ impl<'a> Iterator for TenantShardContextIterator<'a> { // Accumulate the schedule context for all the shards in a tenant schedule_context.avoid(&shard.intent.all_pageservers()); - if let Some(attached) = shard.intent.get_attached() { - schedule_context.push_attached(*attached); - } tenant_shards.push(shard); if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 { @@ -115,7 +112,7 @@ mod tests { assert_eq!(tenant_id, t1_id); assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); assert_eq!(shards.len(), 1); - assert_eq!(context.attach_count(), 1); + assert_eq!(context.location_count(), 2); let (tenant_id, context, shards) = iter.next().unwrap(); assert_eq!(tenant_id, t2_id); @@ -124,13 +121,13 @@ mod tests { assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2)); assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3)); assert_eq!(shards.len(), 4); - assert_eq!(context.attach_count(), 4); + assert_eq!(context.location_count(), 8); let (tenant_id, context, shards) = iter.next().unwrap(); assert_eq!(tenant_id, t3_id); assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); assert_eq!(shards.len(), 1); - assert_eq!(context.attach_count(), 1); + assert_eq!(context.location_count(), 2); for shard in tenants.values_mut() { shard.intent.clear(&mut scheduler); diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index e0a71b5822..2ba2a57eba 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -11,16 +11,14 @@ use crate::{ persistence::TenantShardPersistence, reconciler::{ReconcileUnits, ReconcilerConfig}, scheduler::{ - AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext, - SecondaryShardTag, + AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore, + RefCountUpdate, ScheduleContext, SecondaryShardTag, ShardTag, }, service::ReconcileResultRequest, }; use futures::future::{self, Either}; use itertools::Itertools; -use pageserver_api::controller_api::{ - AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, -}; +use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy}; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, shard::{ShardIdentity, TenantShardId}, @@ -33,6 +31,7 @@ use utils::{ generation::Generation, id::NodeId, seqwait::{SeqWait, SeqWaitError}, + shard::ShardCount, sync::gate::GateGuard, }; @@ -147,45 +146,67 @@ pub(crate) struct TenantShard { // Support/debug tool: if something is going wrong or flapping with scheduling, this may // be set to a non-active state to avoid making changes while the issue is fixed. scheduling_policy: ShardSchedulingPolicy, +} + +#[derive(Clone, Debug, Serialize)] +pub(crate) struct IntentState { + attached: Option, + secondary: Vec, // We should attempt to schedule this shard in the provided AZ to // decrease chances of cross-AZ compute. preferred_az_id: Option, } -#[derive(Default, Clone, Debug, Serialize)] -pub(crate) struct IntentState { - attached: Option, - secondary: Vec, -} - impl IntentState { - pub(crate) fn new() -> Self { + pub(crate) fn new(preferred_az_id: Option) -> Self { Self { attached: None, secondary: vec![], + preferred_az_id, } } - pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option) -> Self { + pub(crate) fn single( + scheduler: &mut Scheduler, + node_id: Option, + preferred_az_id: Option, + ) -> Self { if let Some(node_id) = node_id { - scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach); + scheduler.update_node_ref_counts( + node_id, + preferred_az_id.as_ref(), + RefCountUpdate::Attach, + ); } Self { attached: node_id, secondary: vec![], + preferred_az_id, } } pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option) { if self.attached != new_attached { if let Some(old_attached) = self.attached.take() { - scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); + scheduler.update_node_ref_counts( + old_attached, + self.preferred_az_id.as_ref(), + RefCountUpdate::Detach, + ); } if let Some(new_attached) = &new_attached { - scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach); + scheduler.update_node_ref_counts( + *new_attached, + self.preferred_az_id.as_ref(), + RefCountUpdate::Attach, + ); } self.attached = new_attached; } + + if let Some(new_attached) = &new_attached { + assert!(!self.secondary.contains(new_attached)); + } } /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from @@ -204,15 +225,28 @@ impl IntentState { let demoted = self.attached; self.attached = Some(promote_secondary); - scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary); + scheduler.update_node_ref_counts( + promote_secondary, + self.preferred_az_id.as_ref(), + RefCountUpdate::PromoteSecondary, + ); if let Some(demoted) = demoted { - scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached); + scheduler.update_node_ref_counts( + demoted, + self.preferred_az_id.as_ref(), + RefCountUpdate::DemoteAttached, + ); } } pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { - debug_assert!(!self.secondary.contains(&new_secondary)); - scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary); + assert!(!self.secondary.contains(&new_secondary)); + assert!(self.attached != Some(new_secondary)); + scheduler.update_node_ref_counts( + new_secondary, + self.preferred_az_id.as_ref(), + RefCountUpdate::AddSecondary, + ); self.secondary.push(new_secondary); } @@ -220,27 +254,43 @@ impl IntentState { pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) { let index = self.secondary.iter().position(|n| *n == node_id); if let Some(index) = index { - scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); + scheduler.update_node_ref_counts( + node_id, + self.preferred_az_id.as_ref(), + RefCountUpdate::RemoveSecondary, + ); self.secondary.remove(index); } } pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) { for secondary in self.secondary.drain(..) { - scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary); + scheduler.update_node_ref_counts( + secondary, + self.preferred_az_id.as_ref(), + RefCountUpdate::RemoveSecondary, + ); } } /// Remove the last secondary node from the list of secondaries pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) { if let Some(node_id) = self.secondary.pop() { - scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); + scheduler.update_node_ref_counts( + node_id, + self.preferred_az_id.as_ref(), + RefCountUpdate::RemoveSecondary, + ); } } pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { if let Some(old_attached) = self.attached.take() { - scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); + scheduler.update_node_ref_counts( + old_attached, + self.preferred_az_id.as_ref(), + RefCountUpdate::Detach, + ); } self.clear_secondary(scheduler); @@ -275,7 +325,11 @@ impl IntentState { if self.attached == Some(node_id) { self.attached = None; self.secondary.push(node_id); - scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached); + scheduler.update_node_ref_counts( + node_id, + self.preferred_az_id.as_ref(), + RefCountUpdate::DemoteAttached, + ); true } else { false @@ -315,6 +369,7 @@ pub(crate) struct ObservedStateLocation { /// we know that we might have some state on this node. pub(crate) conf: Option, } + pub(crate) struct ReconcilerWaiter { // For observability purposes, remember the ID of the shard we're // waiting for. @@ -360,6 +415,10 @@ pub(crate) enum ScheduleOptimizationAction { ReplaceSecondary(ReplaceSecondary), // Migrate attachment to an existing secondary location MigrateAttachment(MigrateAttachment), + // Create a secondary location, with the intent of later migrating to it + CreateSecondary(NodeId), + // Remove a secondary location that we previously created to facilitate a migration + RemoveSecondary(NodeId), } #[derive(Eq, PartialEq, Debug, Clone)] @@ -486,7 +545,7 @@ impl TenantShard { Self { tenant_shard_id, policy, - intent: IntentState::default(), + intent: IntentState::new(preferred_az_id), generation: Some(Generation::new(0)), shard, observed: ObservedState::default(), @@ -500,7 +559,6 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), - preferred_az_id, } } @@ -563,7 +621,7 @@ impl TenantShard { return Ok((false, node_id)); } - if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) { + if let Some(promote_secondary) = self.preferred_secondary(scheduler) { // Promote a secondary tracing::debug!("Promoted secondary {} to attached", promote_secondary); self.intent.promote_attached(scheduler, promote_secondary); @@ -572,7 +630,7 @@ impl TenantShard { // Pick a fresh node: either we had no secondaries or none were schedulable let node_id = scheduler.schedule_shard::( &self.intent.secondary, - &self.preferred_az_id, + &self.intent.preferred_az_id, context, )?; tracing::debug!("Selected {} as attached", node_id); @@ -594,9 +652,6 @@ impl TenantShard { let r = self.do_schedule(scheduler, context); context.avoid(&self.intent.all_pageservers()); - if let Some(attached) = self.intent.get_attached() { - context.push_attached(*attached); - } r } @@ -631,24 +686,7 @@ impl TenantShard { use PlacementPolicy::*; match self.policy { Attached(secondary_count) => { - let retain_secondaries = if self.intent.attached.is_none() - && scheduler.node_preferred(&self.intent.secondary).is_some() - { - // If we have no attached, and one of the secondaries is elegible to be promoted, retain - // one more secondary than we usually would, as one of them will become attached futher down this function. - secondary_count + 1 - } else { - secondary_count - }; - - while self.intent.secondary.len() > retain_secondaries { - // We have no particular preference for one secondary location over another: just - // arbitrarily drop from the end - self.intent.pop_secondary(scheduler); - modified = true; - } - - // Should have exactly one attached, and N secondaries + // Should have exactly one attached, and at least N secondaries let (modified_attached, attached_node_id) = self.schedule_attached(scheduler, context)?; modified |= modified_attached; @@ -657,7 +695,7 @@ impl TenantShard { while self.intent.secondary.len() < secondary_count { let node_id = scheduler.schedule_shard::( &used_pageservers, - &self.preferred_az_id, + &self.intent.preferred_az_id, context, )?; self.intent.push_secondary(scheduler, node_id); @@ -674,7 +712,7 @@ impl TenantShard { // Populate secondary by scheduling a fresh node let node_id = scheduler.schedule_shard::( &[], - &self.preferred_az_id, + &self.intent.preferred_az_id, context, )?; self.intent.push_secondary(scheduler, node_id); @@ -718,7 +756,7 @@ impl TenantShard { ) -> Result<(), ScheduleError> { let promote_to = match promote_to { Some(node) => node, - None => match scheduler.node_preferred(self.intent.get_secondary()) { + None => match self.preferred_secondary(scheduler) { Some(node) => node, None => { return Err(ScheduleError::ImpossibleConstraint); @@ -745,90 +783,276 @@ impl TenantShard { Ok(()) } + /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion + fn is_better_location( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + current: NodeId, + candidate: NodeId, + ) -> Option { + let Some(candidate_score) = scheduler.compute_node_score::( + candidate, + &self.intent.preferred_az_id, + schedule_context, + ) else { + // The candidate node is unavailable for scheduling or otherwise couldn't get a score + return None; + }; + + match scheduler.compute_node_score::( + current, + &self.intent.preferred_az_id, + schedule_context, + ) { + Some(current_score) => { + // Ignore utilization components when comparing scores: we don't want to migrate + // because of transient load variations, it risks making the system thrash, and + // migrating for utilization requires a separate high level view of the system to + // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily + // moving things around in the order that we hit this function. + let candidate_score = candidate_score.for_optimization(); + let current_score = current_score.for_optimization(); + + if candidate_score < current_score { + tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"); + Some(true) + } else { + // The candidate node is no better than our current location, so don't migrate + tracing::debug!( + "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})", + ); + Some(false) + } + } + None => { + // The current node is unavailable for scheduling, so we can't make any sensible + // decisions about optimisation. This should be a transient state -- if the node + // is offline then it will get evacuated, if is blocked by a scheduling mode + // then we will respect that mode by doing nothing. + tracing::debug!("Current node {current} is unavailable for scheduling"); + None + } + } + } + + fn find_better_location( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + current: NodeId, + hard_exclude: &[NodeId], + ) -> Option { + // Look for a lower-scoring location to attach to + let Ok(candidate_node) = scheduler.schedule_shard::( + hard_exclude, + &self.intent.preferred_az_id, + schedule_context, + ) else { + // A scheduling error means we have no possible candidate replacements + tracing::debug!("No candidate node found"); + return None; + }; + + if candidate_node == current { + // We're already at the best possible location, so don't migrate + tracing::debug!("Candidate node {candidate_node} is already in use"); + return None; + } + + self.is_better_location::(scheduler, schedule_context, current, candidate_node) + .and_then(|better| if better { Some(candidate_node) } else { None }) + } + + /// This function is an optimization, used to avoid doing large numbers of scheduling operations + /// when looking for optimizations. This function uses knowledge of how scores work to do some + /// fast checks for whether it may to be possible to improve a score. + /// + /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we + /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no + /// work. + pub(crate) fn maybe_optimizable( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + ) -> bool { + // Sharded tenant: check if any locations have a nonzero affinity score + if self.shard.count >= ShardCount(1) { + let schedule_context = schedule_context.project_detach(self); + for node in self.intent.all_pageservers() { + if let Some(af) = schedule_context.nodes.get(&node) { + if *af > AffinityScore(0) { + return true; + } + } + } + } + + // Attached tenant: check if the attachment is outside the preferred AZ + if let PlacementPolicy::Attached(_) = self.policy { + if let Some(attached) = self.intent.get_attached() { + if scheduler.get_node_az(attached) != self.intent.preferred_az_id { + return true; + } + } + } + + // Tenant with secondary locations: check if any are within the preferred AZ + for secondary in self.intent.get_secondary() { + if scheduler.get_node_az(secondary) == self.intent.preferred_az_id { + return true; + } + } + + // Does the tenant have excess secondaries? + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + return true; + } + + // Fall through: no optimizations possible + false + } + /// Optimize attachments: if a shard has a secondary location that is preferable to /// its primary location based on soft constraints, switch that secondary location /// to be attached. #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn optimize_attachment( &self, - nodes: &HashMap, + scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { let attached = (*self.intent.get_attached())?; - if self.intent.secondary.is_empty() { - // We can only do useful work if we have both attached and secondary locations: this - // function doesn't schedule new locations, only swaps between attached and secondaries. - return None; - } - let current_affinity_score = schedule_context.get_node_affinity(attached); - let current_attachment_count = schedule_context.get_node_attachments(attached); + let schedule_context = schedule_context.project_detach(self); - // Generate score for each node, dropping any un-schedulable nodes. - let all_pageservers = self.intent.all_pageservers(); - let mut scores = all_pageservers - .iter() - .flat_map(|node_id| { - let node = nodes.get(node_id); - if node.is_none() { - None - } else if matches!( - node.unwrap().get_scheduling(), - NodeSchedulingPolicy::Filling - ) { - // If the node is currently filling, don't count it as a candidate to avoid, - // racing with the background fill. - None - } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) { - None - } else { - let affinity_score = schedule_context.get_node_affinity(*node_id); - let attachment_count = schedule_context.get_node_attachments(*node_id); - Some((*node_id, affinity_score, attachment_count)) - } - }) - .collect::>(); - - // Sort precedence: - // 1st - prefer nodes with the lowest total affinity score - // 2nd - prefer nodes with the lowest number of attachments in this context - // 3rd - if all else is equal, sort by node ID for determinism in tests. - scores.sort_by_key(|i| (i.1, i.2, i.0)); - - if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) = - scores.first() - { - if attached != *preferred_node { - // The best alternative must be more than 1 better than us, otherwise we could end - // up flapping back next time we're called (e.g. there's no point migrating from - // a location with score 1 to a score zero, because on next location the situation - // would be the same, but in reverse). - if current_affinity_score > *preferred_affinity_score + AffinityScore(1) - || current_attachment_count > *preferred_attachment_count + 1 - { - tracing::info!( - "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})", - self.intent.get_secondary() - ); - return Some(ScheduleOptimization { - sequence: self.sequence, - action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: attached, - new_attached_node_id: *preferred_node, - }), - }); - } - } else { - tracing::debug!( - "Node {} is already preferred (score {:?})", - preferred_node, - preferred_affinity_score - ); + // If we already have a secondary that is higher-scoring than out current location, + // then simply migrate to it. + for secondary in self.intent.get_secondary() { + if let Some(true) = self.is_better_location::( + scheduler, + &schedule_context, + attached, + *secondary, + ) { + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: attached, + new_attached_node_id: *secondary, + }), + }); } } - // Fall-through: we didn't find an optimization - None + // Given that none of our current secondaries is a better location than our current + // attached location (checked above), we may trim any secondaries that are not needed + // for the placement policy. + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + // This code path cleans up extra secondaries after migrating, and/or + // trims extra secondaries after a PlacementPolicy::Attached(N) was + // modified to decrease N. + + let secondary_scores = self + .intent + .get_secondary() + .iter() + .map(|node_id| { + ( + *node_id, + scheduler.compute_node_score::( + *node_id, + &self.intent.preferred_az_id, + &schedule_context, + ), + ) + }) + .collect::>(); + + if secondary_scores.iter().any(|score| score.1.is_none()) { + // Don't have full list of scores, so can't make a good decision about which to drop unless + // there is an obvious one in the wrong AZ + for secondary in self.intent.get_secondary() { + if scheduler.get_node_az(secondary) == self.intent.preferred_az_id { + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(*secondary), + }); + } + } + + // Fall through: we didn't identify one to remove. This ought to be rare. + tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)", + self.intent.get_secondary() + ); + } else { + let victim = secondary_scores + .iter() + .max_by_key(|score| score.1.unwrap()) + .unwrap() + .0; + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(victim), + }); + } + } + + let replacement = self.find_better_location::( + scheduler, + &schedule_context, + attached, + &[], // Don't exclude secondaries: our preferred attachment location may be a secondary + ); + + // We have found a candidate and confirmed that its score is preferable + // to our current location. See if we have a secondary location in the preferred location already: if not, + // then create one. + if let Some(replacement) = replacement { + // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still + // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing + // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred + // AZ: skip this optimization if it is not in our final, preferred AZ. + // + // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes + // there are too overloaded for scheduler to suggest them, more should be provisioned eventually). + if self.intent.preferred_az_id.is_some() + && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id + { + tracing::debug!( + "Candidate node {replacement} is not in preferred AZ {:?}", + self.intent.preferred_az_id + ); + + // This should only happen if our current location is not in the preferred AZ, otherwise + // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because + // AZ is the highest priority part of NodeAttachmentSchedulingScore. + debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id); + + return None; + } + + if !self.intent.get_secondary().contains(&replacement) { + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::CreateSecondary(replacement), + }) + } else { + // We already have a secondary in the preferred location, let's try migrating to it. Our caller + // will check the warmth of the destination before deciding whether to really execute this. + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: attached, + new_attached_node_id: replacement, + }), + }) + } + } else { + // We didn't find somewhere we'd rather be, and we don't have any excess secondaries + // to clean up: no action required. + None + } } #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] @@ -837,50 +1061,40 @@ impl TenantShard { scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { - if self.intent.secondary.is_empty() { - // We can only do useful work if we have both attached and secondary locations: this - // function doesn't schedule new locations, only swaps between attached and secondaries. + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + // We have extra secondaries, perhaps to facilitate a migration of the attached location: + // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done, + // and we are called again, we will proceed. + tracing::debug!("Too many secondaries: skipping"); return None; } + let schedule_context = schedule_context.project_detach(self); + for secondary in self.intent.get_secondary() { - let Some(affinity_score) = schedule_context.nodes.get(secondary) else { - // We're already on a node unaffected any affinity constraints, - // so we won't change it. - continue; + // Make sure we don't try to migrate a secondary to our attached location: this case happens + // easily in environments without multiple AZs. + let exclude = match self.intent.attached { + Some(attached) => vec![attached], + None => vec![], }; - // Let the scheduler suggest a node, where it would put us if we were scheduling afresh - // This implicitly limits the choice to nodes that are available, and prefers nodes - // with lower utilization. - let Ok(candidate_node) = scheduler.schedule_shard::( - &self.intent.all_pageservers(), - &self.preferred_az_id, - schedule_context, - ) else { - // A scheduling error means we have no possible candidate replacements - continue; - }; - - let candidate_affinity_score = schedule_context - .nodes - .get(&candidate_node) - .unwrap_or(&AffinityScore::FREE); - - // The best alternative must be more than 1 better than us, otherwise we could end - // up flapping back next time we're called. - if *candidate_affinity_score + AffinityScore(1) < *affinity_score { - // If some other node is available and has a lower score than this node, then - // that other node is a good place to migrate to. - tracing::info!( - "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})", - self.intent.get_secondary() - ); + let replacement = self.find_better_location::( + scheduler, + &schedule_context, + *secondary, + &exclude, + ); + assert!(replacement != Some(*secondary)); + if let Some(replacement) = replacement { + // We have found a candidate and confirmed that its score is preferable + // to our current location. See if we have a secondary location in the preferred location already: if not, + // then create one. return Some(ScheduleOptimization { sequence: self.sequence, action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary { old_node_id: *secondary, - new_node_id: candidate_node, + new_node_id: replacement, }), }); } @@ -921,11 +1135,54 @@ impl TenantShard { self.intent.remove_secondary(scheduler, old_node_id); self.intent.push_secondary(scheduler, new_node_id); } + ScheduleOptimizationAction::CreateSecondary(new_node_id) => { + self.intent.push_secondary(scheduler, new_node_id); + } + ScheduleOptimizationAction::RemoveSecondary(old_secondary) => { + self.intent.remove_secondary(scheduler, old_secondary); + } } true } + /// When a shard has several secondary locations, we need to pick one in situations where + /// we promote one of them to an attached location: + /// - When draining a node for restart + /// - When responding to a node failure + /// + /// In this context, 'preferred' does not mean the node with the best scheduling score: instead + /// we want to pick the node which is best for use _temporarily_ while the previous attached location + /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary + /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary + /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist + /// they're probably not warmed up yet). The latter behavior is based oni + /// + /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the + /// caller needs to a pick a node some other way. + pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option { + let candidates = scheduler.filter_usable_nodes(&self.intent.secondary); + + // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer + // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ), + // rather than a short-lived secondary location being used for optimization/migration (which would have + // been scheduled in our preferred AZ). + let mut candidates = candidates + .iter() + .map(|(node_id, node_az)| { + if node_az == &self.intent.preferred_az_id { + (1, *node_id) + } else { + (0, *node_id) + } + }) + .collect::>(); + + candidates.sort(); + + candidates.first().map(|i| i.1) + } + /// Query whether the tenant's observed state for attached node matches its intent state, and if so, /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there. @@ -1207,7 +1464,7 @@ impl TenantShard { detach, reconciler_config, config: self.config.clone(), - preferred_az: self.preferred_az_id.clone(), + preferred_az: self.intent.preferred_az_id.clone(), observed: self.observed.clone(), original_observed: self.observed.clone(), compute_hook: compute_hook.clone(), @@ -1428,7 +1685,6 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), - preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone), }) } @@ -1444,16 +1700,16 @@ impl TenantShard { config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(), - preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()), + preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()), } } pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> { - self.preferred_az_id.as_ref() + self.intent.preferred_az_id.as_ref() } pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) { - self.preferred_az_id = Some(preferred_az_id); + self.intent.preferred_az_id = Some(preferred_az_id); } /// Returns all the nodes to which this tenant shard is attached according to the @@ -1756,65 +2012,90 @@ pub(crate) mod tests { } #[test] - fn optimize_attachment() -> anyhow::Result<()> { - let nodes = make_test_nodes(3, &[]); + /// Simple case: moving attachment to somewhere better where we already have a secondary + fn optimize_attachment_simple() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 3, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); // Initially: both nodes attached on shard 1, and both have secondary locations // on different nodes. - shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1))); - shard_a.intent.push_secondary(&mut scheduler, NodeId(2)); + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(1)); shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1))); - shard_b.intent.push_secondary(&mut scheduler, NodeId(3)); + shard_b.intent.push_secondary(&mut scheduler, NodeId(2)); - let mut schedule_context = ScheduleContext::default(); - schedule_context.avoid(&shard_a.intent.all_pageservers()); - schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); - schedule_context.avoid(&shard_b.intent.all_pageservers()); - schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); + fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context.avoid(&shard_b.intent.all_pageservers()); + schedule_context + } - let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context); - - // Either shard should recognize that it has the option to switch to a secondary location where there - // would be no other shards from the same tenant, and request to do so. + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context); assert_eq!( optimization_a, Some(ScheduleOptimization { sequence: shard_a.sequence, action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: NodeId(1), - new_attached_node_id: NodeId(2) + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) }) }) ); - - // Note that these optimizing two shards in the same tenant with the same ScheduleContext is - // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility - // of [`Service::optimize_all`] to avoid trying - // to do optimizations for multiple shards in the same tenant at the same time. Generating - // both optimizations is just done for test purposes - let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context); - assert_eq!( - optimization_b, - Some(ScheduleOptimization { - sequence: shard_b.sequence, - action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: NodeId(1), - new_attached_node_id: NodeId(3) - }) - }) - ); - - // Applying these optimizations should result in the end state proposed shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap()); - assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2))); - assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]); - shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap()); - assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3))); - assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]); + + // // Either shard should recognize that it has the option to switch to a secondary location where there + // // would be no other shards from the same tenant, and request to do so. + // assert_eq!( + // optimization_a_prepare, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_migrate, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + // old_attached_node_id: NodeId(1), + // new_attached_node_id: NodeId(2) + // }) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_cleanup, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None); shard_a.intent.clear(&mut scheduler); shard_b.intent.clear(&mut scheduler); @@ -1822,6 +2103,190 @@ pub(crate) mod tests { Ok(()) } + #[test] + /// Complicated case: moving attachment to somewhere better where we do not have a secondary + /// already, creating one as needed. + fn optimize_attachment_multistep() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 3, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Two shards of a tenant that wants to be in AZ A + let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + + // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(3)); + shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3))); + shard_b.intent.push_secondary(&mut scheduler, NodeId(2)); + + fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context.avoid(&shard_b.intent.all_pageservers()); + schedule_context + } + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(1)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) + }) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None); + + shard_a.intent.clear(&mut scheduler); + shard_b.intent.clear(&mut scheduler); + + Ok(()) + } + + #[test] + /// Check that multi-step migration works when moving to somewhere that is only better by + /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary + /// counting toward the affinity score such that it prevents the rest of the migration from happening. + fn optimize_attachment_marginal() -> anyhow::Result<()> { + let nodes = make_test_nodes(2, &[]); + let mut scheduler = Scheduler::new(nodes.values()); + + // Multi-sharded tenant, we will craft a situation where affinity + // scores differ only slightly + let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None); + + // 1 attached on node 1 + shards[0] + .intent + .set_attached(&mut scheduler, Some(NodeId(1))); + // 3 attached on node 2 + shards[1] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + shards[2] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + shards[3] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + + // The scheduler should figure out that we need to: + // - Create a secondary for shard 3 on node 1 + // - Migrate shard 3 to node 1 + // - Remove shard 3's location on node 2 + + fn make_schedule_context(shards: &Vec) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + for shard in shards { + schedule_context.avoid(&shard.intent.all_pageservers()); + } + schedule_context + } + + let schedule_context = make_schedule_context(&shards); + let optimization_a_prepare = + shards[1].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shards[1].sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(1)) + }) + ); + shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + let schedule_context = make_schedule_context(&shards); + let optimization_a_migrate = + shards[1].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shards[1].sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) + }) + }) + ); + shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + let schedule_context = make_schedule_context(&shards); + let optimization_a_cleanup = + shards[1].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shards[1].sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2)) + }) + ); + shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // Everything should be stable now + let schedule_context = make_schedule_context(&shards); + assert_eq!( + shards[0].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + assert_eq!( + shards[1].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + assert_eq!( + shards[2].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + assert_eq!( + shards[3].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + + for mut shard in shards { + shard.intent.clear(&mut scheduler); + } + + Ok(()) + } + #[test] fn optimize_secondary() -> anyhow::Result<()> { let nodes = make_test_nodes(4, &[]); @@ -1839,9 +2304,7 @@ pub(crate) mod tests { let mut schedule_context = ScheduleContext::default(); schedule_context.avoid(&shard_a.intent.all_pageservers()); - schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); schedule_context.avoid(&shard_b.intent.all_pageservers()); - schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context); @@ -1872,7 +2335,6 @@ pub(crate) mod tests { // called repeatedly in the background. // Returns the applied optimizations fn optimize_til_idle( - nodes: &HashMap, scheduler: &mut Scheduler, shards: &mut [TenantShard], ) -> Vec { @@ -1884,14 +2346,18 @@ pub(crate) mod tests { for shard in shards.iter() { schedule_context.avoid(&shard.intent.all_pageservers()); - if let Some(attached) = shard.intent.get_attached() { - schedule_context.push_attached(*attached); - } } for shard in shards.iter_mut() { - let optimization = shard.optimize_attachment(nodes, &schedule_context); + let optimization = shard.optimize_attachment(scheduler, &schedule_context); + tracing::info!( + "optimize_attachment({})={:?}", + shard.tenant_shard_id, + optimization + ); if let Some(optimization) = optimization { + // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist + assert!(shard.maybe_optimizable(scheduler, &schedule_context)); optimizations.push(optimization.clone()); shard.apply_optimization(scheduler, optimization); any_changed = true; @@ -1899,7 +2365,15 @@ pub(crate) mod tests { } let optimization = shard.optimize_secondary(scheduler, &schedule_context); + tracing::info!( + "optimize_secondary({})={:?}", + shard.tenant_shard_id, + optimization + ); if let Some(optimization) = optimization { + // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist + assert!(shard.maybe_optimizable(scheduler, &schedule_context)); + optimizations.push(optimization.clone()); shard.apply_optimization(scheduler, optimization); any_changed = true; @@ -1923,14 +2397,34 @@ pub(crate) mod tests { /// that it converges. #[test] fn optimize_add_nodes() -> anyhow::Result<()> { - let nodes = make_test_nodes(4, &[]); + let nodes = make_test_nodes( + 9, + &[ + // Initial 6 nodes + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + AvailabilityZone("az-c".to_string()), + // Three we will add later + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); - // Only show the scheduler a couple of nodes + // Only show the scheduler two nodes in each AZ to start with let mut scheduler = Scheduler::new([].iter()); - scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); - scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); + for i in 1..=6 { + scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap()); + } - let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); + let mut shards = make_test_tenant( + PlacementPolicy::Attached(1), + ShardCount::new(4), + Some(AvailabilityZone("az-a".to_string())), + ); let mut schedule_context = ScheduleContext::default(); for shard in &mut shards { assert!(shard @@ -1938,30 +2432,50 @@ pub(crate) mod tests { .is_ok()); } - // We should see equal number of locations on the two nodes. - assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4); + // Initial: attached locations land in the tenant's home AZ. + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2); - - assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4); + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2); - // Add another two nodes: we should see the shards spread out when their optimize - // methods are called - scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap()); - scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap()); - optimize_til_idle(&nodes, &mut scheduler, &mut shards); + // Initial: secondary locations in a remote AZ + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0); - assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); + // Add another three nodes: we should see the shards spread out when their optimize + // methods are called + scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap()); + scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap()); + scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap()); + optimize_til_idle(&mut scheduler, &mut shards); + + // We expect one attached location was moved to the new node in the tenant's home AZ + assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1); + // The original node has one less attached shard + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1); + // One of the original nodes still has two attachments, since there are an odd number of nodes assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2); - assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1); - - assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1); + // None of our secondaries moved, since we already had enough nodes for those to be + // scheduled perfectly + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0); for shard in shards.iter_mut() { shard.intent.clear(&mut scheduler); @@ -2001,10 +2515,10 @@ pub(crate) mod tests { shard.schedule(&mut scheduler, context).unwrap(); } - let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a); + let applied_to_a = optimize_til_idle(&mut scheduler, &mut a); assert_eq!(applied_to_a, vec![]); - let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b); + let applied_to_b = optimize_til_idle(&mut scheduler, &mut b); assert_eq!(applied_to_b, vec![]); for shard in a.iter_mut().chain(b.iter_mut()) { diff --git a/test_runner/performance/test_sharding_autosplit.py b/test_runner/performance/test_sharding_autosplit.py index caa89955e3..76c3ad01a4 100644 --- a/test_runner/performance/test_sharding_autosplit.py +++ b/test_runner/performance/test_sharding_autosplit.py @@ -2,6 +2,7 @@ from __future__ import annotations import concurrent.futures import re +import threading from pathlib import Path import pytest @@ -188,7 +189,20 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): check_pgbench_output(out_path) - with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads: + stop_pump = threading.Event() + + def pump_controller(): + # Run a background loop to force the storage controller to run its + # background work faster than it otherwise would: this helps + # us: + # A) to create a test that runs in a shorter time + # B) to create a test that is more intensive by doing the shard migrations + # after splits happen more rapidly. + while not stop_pump.is_set(): + env.storage_controller.reconcile_all() + stop_pump.wait(0.1) + + with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count + 1) as pgbench_threads: pgbench_futs = [] for tenant_state in tenants.values(): fut = pgbench_threads.submit(run_pgbench_init, tenant_state.endpoint) @@ -198,6 +212,8 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): for fut in pgbench_futs: fut.result() + pump_fut = pgbench_threads.submit(pump_controller) + pgbench_futs = [] for tenant_state in tenants.values(): fut = pgbench_threads.submit(run_pgbench_main, tenant_state.endpoint) @@ -207,6 +223,9 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): for fut in pgbench_futs: fut.result() + stop_pump.set() + pump_fut.result() + def assert_all_split(): for tenant_id in tenants.keys(): shards = tenant_get_shards(env, tenant_id) diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index 49f41483ec..d45db28c78 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -13,11 +13,13 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserver, PageserverAvailability, PageserverSchedulingPolicy, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pg_version import PgVersion +from fixtures.utils import wait_until def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]: @@ -85,8 +87,12 @@ def test_storage_controller_many_tenants( ) AZS = ["alpha", "bravo", "charlie"] + + def az_selector(node_id): + return f"az-{AZS[(node_id - 1) % len(AZS)]}" + neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update( - {"availability_zone": f"az-{AZS[ps_cfg['id'] % len(AZS)]}"} + {"availability_zone": az_selector(ps_cfg["id"])} ) # A small sleep on each call into the notify hook, to simulate the latency of doing a database write @@ -168,6 +174,31 @@ def test_storage_controller_many_tenants( log.info(f"Resident memory: {rss} ({ rss / total_shards} per shard)") assert rss < expect_memory_per_shard * total_shards + def assert_all_tenants_scheduled_in_home_az(): + for tenant_id in tenant_ids: + desc = env.storage_controller.tenant_describe(tenant_id) + preferred_az = None + for shard in desc["shards"]: + # All shards in a tenant should have the same preferred AZ + if preferred_az is None: + preferred_az = shard["preferred_az_id"] + else: + assert preferred_az == shard["preferred_az_id"] + + # Attachment should be in the preferred AZ + assert shard["preferred_az_id"] == az_selector( + shard["node_attached"] + ), f"Shard {shard['tenant_shard_id']} not in {shard['preferred_az_id']}" + + # Secondary locations should not be in the preferred AZ + for node_secondary in shard["node_secondary"]: + assert ( + shard["preferred_az_id"] != az_selector(node_secondary) + ), f"Shard {shard['tenant_shard_id']} secondary should be in {shard['preferred_az_id']}" + + # There should only be one secondary location (i.e. no migrations in flight) + assert len(shard["node_secondary"]) == 1 + # Issue more concurrent operations than the storage controller's reconciler concurrency semaphore # permits, to ensure that we are exercising stressing that. api_concurrency = 135 @@ -242,6 +273,22 @@ def test_storage_controller_many_tenants( f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s" ) + # Check initial scheduling + assert_all_tenants_scheduled_in_home_az() + az_attached_counts: defaultdict[str, int] = defaultdict(int) + az_secondary_counts: defaultdict[str, int] = defaultdict(int) + node_attached_counts: defaultdict[str, int] = defaultdict(int) + for tenant_id in tenants.keys(): + desc = env.storage_controller.tenant_describe(tenant_id) + for shard in desc["shards"]: + az_attached_counts[az_selector(shard["node_attached"])] += 1 + node_attached_counts[shard["node_attached"]] += 1 + for node_secondary in shard["node_secondary"]: + az_secondary_counts[az_selector(node_secondary)] += 1 + + log.info(f"Initial node attached counts: {node_attached_counts}") + log.info(f"Initial AZ shard counts: {az_attached_counts}, {az_secondary_counts}") + # Plan operations: ensure each tenant with a timeline gets at least # one of each operation type. Then add other tenants to make up the # numbers. @@ -450,11 +497,77 @@ def test_storage_controller_many_tenants( env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) env.storage_controller.consistency_check() + # Since we did `reconcile_until_idle` during the above loop, the system should be left in + # an optimally scheduled state. Validate that this includes all the tenants being scheduled + # in their home AZ. + assert_all_tenants_scheduled_in_home_az() + # Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn, # as they were not offline long enough to trigger any scheduling changes. env.storage_controller.consistency_check() check_memory() + # Simulate loss of an AZ + victim_az = "az-alpha" + killed_pageservers = [] + for ps in env.pageservers: + if az_selector(ps.id) == victim_az: + ps.stop(immediate=True) + killed_pageservers.append(ps) + log.info(f"Killed pageserver {ps.id}") + + assert killed_pageservers + + # Wait for the controller to notice the pageservers are dead + def assert_pageservers_availability( + pageservers: list[NeonPageserver], expected_availability: PageserverAvailability + ): + nodes = env.storage_controller.nodes() + checked_any = False + node_ids = [ps.id for ps in pageservers] + for node in nodes: + if node["id"] in node_ids: + checked_any = True + assert ( + node["availability"] == expected_availability + ), f"Node {node['id']} is not {expected_availability} yet: {node['availability']}" + + assert checked_any + + wait_until( + lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.OFFLINE), + timeout=60, + ) + + # Let the controller finish all its rescheduling + env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) + + # Check that all the tenants are rescheduled to the remaining pageservers + for tenant_id in tenant_ids: + desc = env.storage_controller.tenant_describe(tenant_id) + for shard in desc["shards"]: + # Attachment should be outside the AZ where we killed the pageservers + assert ( + az_selector(shard["node_attached"]) != victim_az + ), f"Shard {shard['tenant_shard_id']} still in {victim_az} (node {shard['node_attached']})" + + # Bring back the pageservers + for ps in killed_pageservers: + ps.start() + + wait_until( + lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.ACTIVE), + timeout=60, + ) + + # A very long timeout is required: we will be migrating all the tenants on all the pageservers + # in the region that we just restored. Assume it'll take up to twice as long as it took to fill + # a single node + env.storage_controller.reconcile_until_idle( + max_interval=0.1, timeout_secs=DRAIN_FILL_TIMEOUT * 4 + ) + assert_all_tenants_scheduled_in_home_az() + # Stop the storage controller before tearing down fixtures, because it otherwise might log # errors trying to call our `ComputeReconfigure`. env.storage_controller.stop() diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 673904a1cd..86a6b7428b 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -520,14 +520,18 @@ def test_sharding_split_smoke( shard_count = 2 # Shard count we split into split_shard_count = 4 - # We will have 2 shards per pageserver once done (including secondaries) - neon_env_builder.num_pageservers = split_shard_count + # In preferred AZ & other AZ we will end up with one shard per pageserver + neon_env_builder.num_pageservers = split_shard_count * 2 # Two AZs def assign_az(ps_cfg): az = f"az-{(ps_cfg['id'] - 1) % 2}" ps_cfg["availability_zone"] = az + # We will run more pageservers than tests usually do, so give them tiny page caches + # in case we're on a test node under memory pressure. + ps_cfg["page_cache_size"] = 128 + neon_env_builder.pageserver_config_override = assign_az # 1MiB stripes: enable getting some meaningful data distribution without @@ -679,8 +683,8 @@ def test_sharding_split_smoke( # - shard_count reconciles for the original setup of the tenant # - shard_count reconciles for detaching the original secondary locations during split # - split_shard_count reconciles during shard splitting, for setting up secondaries. - # - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move) - expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2 + # - split_shard_count/2 reconciles to migrate shards to their temporary secondaries + expect_reconciles = shard_count * 2 + split_shard_count + 3 * (split_shard_count / 2) reconcile_ok = env.storage_controller.get_metric_value( "storage_controller_reconcile_complete_total", filter={"status": "ok"} @@ -745,10 +749,14 @@ def test_sharding_split_smoke( # dominated by shard count. log.info(f"total: {total}") assert total == { - 1: 2, - 2: 2, - 3: 2, - 4: 2, + 1: 1, + 2: 1, + 3: 1, + 4: 1, + 5: 1, + 6: 1, + 7: 1, + 8: 1, } # The controller is not required to lay out the attached locations in any particular way, but @@ -1387,13 +1395,7 @@ def test_sharding_split_failures( else: attached_count += 1 - if exclude_ps_id is not None: - # For a node failure case, we expect there to be a secondary location - # scheduled on the offline node, so expect one fewer secondary in total - assert secondary_count == initial_shard_count - 1 - else: - assert secondary_count == initial_shard_count - + assert secondary_count == initial_shard_count assert attached_count == initial_shard_count def assert_split_done(exclude_ps_id: int | None = None) -> None: diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 3a55e75589..8ffb6ba6b2 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3213,11 +3213,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool: @run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder): def assign_az(ps_cfg): - az = f"az-{ps_cfg['id']}" + az = f"az-{ps_cfg['id'] % 2}" + log.info("Assigned AZ {az}") ps_cfg["availability_zone"] = az neon_env_builder.pageserver_config_override = assign_az - neon_env_builder.num_pageservers = 2 + neon_env_builder.num_pageservers = 4 env = neon_env_builder.init_configs() env.start() @@ -3232,8 +3233,14 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder): assert shards[0]["preferred_az_id"] == expected_az + # When all other schedule scoring parameters are equal, tenants should round-robin on AZs + assert env.storage_controller.tenant_describe(tids[0])["shards"][0]["preferred_az_id"] == "az-0" + assert env.storage_controller.tenant_describe(tids[1])["shards"][0]["preferred_az_id"] == "az-1" + assert env.storage_controller.tenant_describe(tids[2])["shards"][0]["preferred_az_id"] == "az-0" + + # Try modifying preferred AZ updated = env.storage_controller.set_preferred_azs( - {TenantShardId(tid, 0, 0): "foo" for tid in tids} + {TenantShardId(tid, 0, 0): "az-0" for tid in tids} ) assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids]) @@ -3241,29 +3248,24 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder): for tid in tids: shards = env.storage_controller.tenant_describe(tid)["shards"] assert len(shards) == 1 - assert shards[0]["preferred_az_id"] == "foo" + assert shards[0]["preferred_az_id"] == "az-0" - # Generate a layer to avoid shard split handling on ps from tripping - # up on debug assert. - timeline_id = TimelineId.generate() - env.create_timeline("bar", tids[0], timeline_id) - - workload = Workload(env, tids[0], timeline_id, branch_name="bar") - workload.init() - workload.write_rows(256) - workload.validate() + # Having modified preferred AZ, we should get moved there + env.storage_controller.reconcile_until_idle(max_interval=0.1) + for tid in tids: + shard = env.storage_controller.tenant_describe(tid)["shards"][0] + attached_to = shard["node_attached"] + attached_in_az = env.get_pageserver(attached_to).az_id + assert shard["preferred_az_id"] == attached_in_az == "az-0" env.storage_controller.tenant_shard_split(tids[0], shard_count=2) + env.storage_controller.reconcile_until_idle(max_interval=0.1) shards = env.storage_controller.tenant_describe(tids[0])["shards"] assert len(shards) == 2 for shard in shards: attached_to = shard["node_attached"] - expected_az = env.get_pageserver(attached_to).az_id - - # The scheduling optimization logic is not yet AZ-aware, so doesn't succeed - # in putting the tenant shards in the preferred AZ. - # To be fixed in https://github.com/neondatabase/neon/pull/9916 - # assert shard["preferred_az_id"] == expected_az + attached_in_az = env.get_pageserver(attached_to).az_id + assert shard["preferred_az_id"] == attached_in_az == "az-0" @run_only_on_default_postgres("Postgres version makes no difference here")