From 01643006c63cd32748497feb0476e5cbc9bb06b2 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 5 Dec 2024 17:32:50 +0000 Subject: [PATCH] optimisation change --- libs/pageserver_api/src/controller_api.rs | 10 + storage_controller/Cargo.toml | 3 + storage_controller/src/scheduler.rs | 117 ++++- storage_controller/src/service.rs | 16 +- storage_controller/src/tenant_shard.rs | 563 ++++++++++++++++------ 5 files changed, 548 insertions(+), 161 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 9a5ebc95bd..8429b8ebd8 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -325,6 +325,16 @@ pub enum PlacementPolicy { Detached, } +impl PlacementPolicy { + pub fn want_secondaries(&self) -> usize { + match self { + PlacementPolicy::Attached(secondary_count) => *secondary_count, + PlacementPolicy::Secondary => 1, + PlacementPolicy::Detached => 0, + } + } +} + #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateResponse {} diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 2f5d266567..a01726b727 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -56,3 +56,6 @@ utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } control_plane = { path = "../control_plane" } workspace_hack = { version = "0.1", path = "../workspace_hack" } + +[dev-dependencies] +test-log = {version="0.2.16"} \ No newline at end of file diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 8a4538b84f..2f764b6e73 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -47,6 +47,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { preferred_az: &Option, context: &ScheduleContext, ) -> Option; + + /// Return a score that drops any components based on node utilization: this is useful + /// for finding scores for scheduling optimisation, when we want to avoid rescheduling + /// shards due to e.g. disk usage, to avoid flapping. + fn disregard_utilization(&self) -> Self; + fn is_overloaded(&self) -> bool; fn node_id(&self) -> NodeId; } @@ -156,6 +162,28 @@ pub(crate) struct NodeAttachmentSchedulingScore { node_id: NodeId, } +impl NodeAttachmentSchedulingScore { + /// For speculative scheduling: generate the score for this node as if one more tenant + /// shard was attached to it. + pub(crate) fn project_attachment(&self) -> Self { + Self { + total_attached_shard_count: self.total_attached_shard_count + 1, + ..*self + } + } + + /// The literal equality and comparison operators include the node ID to provide a deterministic + /// ordering. However, when doing scheduling optimisation we of course don't want to regard + /// a difference in node ID as significant, so that code uses this method to exclude that case. + pub(crate) fn different(&self, other: &Self) -> bool { + self.az_match != other.az_match + || self.affinity_score != other.affinity_score + || self.attached_shards_in_context != other.attached_shards_in_context + || self.utilization_score != other.utilization_score + || self.total_attached_shard_count != other.total_attached_shard_count + } +} + impl NodeSchedulingScore for NodeAttachmentSchedulingScore { fn generate( node_id: &NodeId, @@ -184,6 +212,18 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { }) } + /// For use in scheduling optimisation, where we only want to consider the aspects + /// of the score that can only be resolved by moving things (such as inter-shard affinity + /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which + /// can fluctuate for other reasons) + fn disregard_utilization(&self) -> Self { + Self { + utilization_score: 0, + total_attached_shard_count: 0, + ..*self + } + } + fn is_overloaded(&self) -> bool { PageserverUtilization::is_overloaded(self.utilization_score) } @@ -215,6 +255,17 @@ pub(crate) struct NodeSecondarySchedulingScore { node_id: NodeId, } +impl NodeSecondarySchedulingScore { + /// The literal equality and comparison operators include the node ID to provide a deterministic + /// ordering. However, when doing scheduling optimisation we of course don't want to regard + /// a difference in node ID as significant, so that code uses this method to exclude that case. + pub(crate) fn different(&self, other: &Self) -> bool { + self.az_match != other.az_match + || self.affinity_score != other.affinity_score + || self.utilization_score != other.utilization_score + } +} + impl NodeSchedulingScore for NodeSecondarySchedulingScore { fn generate( node_id: &NodeId, @@ -242,6 +293,13 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { }) } + fn disregard_utilization(&self) -> Self { + Self { + utilization_score: 0, + ..*self + } + } + fn is_overloaded(&self) -> bool { PageserverUtilization::is_overloaded(self.utilization_score) } @@ -293,6 +351,10 @@ impl AffinityScore { pub(crate) fn inc(&mut self) { self.0 += 1; } + + pub(crate) fn dec(&mut self) { + self.0 -= 1; + } } impl std::ops::Add for AffinityScore { @@ -353,15 +415,36 @@ impl ScheduleContext { *entry += 1; } - pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore { - self.nodes - .get(&node_id) - .copied() - .unwrap_or(AffinityScore::FREE) + /// Imagine we migrated our attached location to the given node. Return a new context that + /// reflects this. + pub(crate) fn project_detach(&self, source: NodeId) -> Self { + let mut new_context = self.clone(); + + if let Some(count) = new_context.attached_nodes.get_mut(&source) { + // It's unexpected that we get called in a context where the source of + // the migration is not already in the context. + debug_assert!(*count > 0); + + if *count > 0 { + *count -= 1; + } + } + + if let Some(score) = new_context.nodes.get_mut(&source) { + score.dec(); + } + + new_context } - pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize { - self.attached_nodes.get(&node_id).copied().unwrap_or(0) + pub(crate) fn project_secondary_detach(&self, source: NodeId) -> Self { + let mut new_context = self.clone(); + + if let Some(score) = new_context.nodes.get_mut(&source) { + score.dec(); + } + + new_context } #[cfg(test)] @@ -636,6 +719,22 @@ impl Scheduler { node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None }) } + /// Calculate a single node's score, used in optimizer logic to compare specific + /// nodes' scores. + pub(crate) fn compute_node_score( + &mut self, + node_id: NodeId, + preferred_az: &Option, + context: &ScheduleContext, + ) -> Option + where + Score: NodeSchedulingScore, + { + self.nodes + .get_mut(&node_id) + .and_then(|node| Score::generate(&node_id, node, preferred_az, context)) + } + /// Compute a schedulling score for each node that the scheduler knows of /// minus a set of hard excluded nodes. fn compute_node_scores( @@ -742,6 +841,10 @@ impl Scheduler { self.schedule_shard::(&[], &None, &ScheduleContext::default()) } + pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option { + self.nodes.get(node_id).map(|n| n.az.clone()) + } + /// Unit test access to internal state #[cfg(test)] pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 2cbd57e34e..242451e9dd 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -6113,7 +6113,13 @@ impl Service { let mut work = Vec::new(); let mut locked = self.inner.write().unwrap(); - let (nodes, tenants, scheduler) = locked.parts_mut(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + // We are going to plan a bunch of optimisations before applying any of them, so the + // utilisation stats on nodes will be effectively stale for the >1st optimisation we + // generate. To avoid this causing unstable migrations/flapping, it's important that the + // code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`] + // to ignore the utilisation component of the score. for (_tenant_id, schedule_context, shards) in TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) @@ -6149,7 +6155,7 @@ impl Service { if let Some(optimization) = // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to // its primary location based on soft constraints, cut it over. - shard.optimize_attachment(nodes, &schedule_context) + shard.optimize_attachment(scheduler, &schedule_context) { work.push((shard.tenant_shard_id, optimization)); break; @@ -6208,8 +6214,10 @@ impl Service { } } } - ScheduleOptimizationAction::ReplaceSecondary(_) => { - // No extra checks needed to replace a secondary: this does not interrupt client access + ScheduleOptimizationAction::ReplaceSecondary(_) + | ScheduleOptimizationAction::CreateSecondary(_) + | ScheduleOptimizationAction::RemoveSecondary(_) => { + // No extra checks needed to manage secondaries: this does not interrupt client access validated_work.push((tenant_shard_id, optimization)) } }; diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 3462204eb1..3a7d0e7cdb 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -11,16 +11,14 @@ use crate::{ persistence::TenantShardPersistence, reconciler::{ReconcileUnits, ReconcilerConfig}, scheduler::{ - AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext, - SecondaryShardTag, + AttachedShardTag, NodeAttachmentSchedulingScore, NodeSchedulingScore, + NodeSecondarySchedulingScore, RefCountUpdate, ScheduleContext, SecondaryShardTag, }, service::ReconcileResultRequest, }; use futures::future::{self, Either}; use itertools::Itertools; -use pageserver_api::controller_api::{ - AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, -}; +use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy}; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, shard::{ShardIdentity, TenantShardId}, @@ -360,6 +358,10 @@ pub(crate) enum ScheduleOptimizationAction { ReplaceSecondary(ReplaceSecondary), // Migrate attachment to an existing secondary location MigrateAttachment(MigrateAttachment), + // Create a secondary location, with the intent of later migrating to it + CreateSecondary(NodeId), + // Remove a secondary location that we previously created to facilitate a migration + RemoveSecondary(NodeId), } #[derive(Eq, PartialEq, Debug, Clone)] @@ -723,90 +725,175 @@ impl TenantShard { Ok(()) } + fn find_better_location( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + ) -> Option { + // TODO: fast path: if the attached node is already in the preferred AZ, _and_ has no + // other shards from the same tenant on it, then skip doing any scheduling calculations. + + let attached = (*self.intent.get_attached())?; + let schedule_context = schedule_context.project_detach(attached); + + // Look for a lower-scoring location to attach to + let Ok(candidate_node) = scheduler.schedule_shard::( + &[], // Don't hard-exclude anything: we want to consider the possibility of migrating to somewhere we already have a secondary + &self.preferred_az_id, + &schedule_context, + ) else { + // A scheduling error means we have no possible candidate replacements + tracing::debug!("No candidate node found"); + return None; + }; + + if candidate_node == attached { + // We're already at the best possible location, so don't migrate + tracing::debug!("Candidate node {candidate_node} is already attached"); + return None; + } + + // Check if our candidate node is in the preferred AZ: it might not be, if the scheduler + // is trying its best to handle an overloaded AZ. + if self.preferred_az_id.is_some() + && scheduler.get_node_az(&candidate_node) != self.preferred_az_id + { + tracing::debug!( + "Candidate node {candidate_node} is not in preferred AZ {:?}", + self.preferred_az_id + ); + return None; + } + + // Consider whether the candidate is any better than our current location once the + // context is updated to reflect the migration. + let Some(candidate_score) = scheduler.compute_node_score::( + candidate_node, + &self.preferred_az_id, + &schedule_context, + ) else { + // The candidate node is unavailable for scheduling or otherwise couldn't get a score + // This is unexpected, because schedule() yielded this node + debug_assert!(false); + return None; + }; + + match scheduler.compute_node_score::( + attached, + &self.preferred_az_id, + &schedule_context, + ) { + Some(current_score) => { + // Ignore utilization components when comparing scores: we don't want to migrate + // because of transient load variations, it risks making the system thrash, and + // migrating for utilization requires a separate high level view of the system to + // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily + // moving things around in the order that we hit this function. + let candidate_score = candidate_score.disregard_utilization(); + let candidate_score = candidate_score.project_attachment(); + + let current_score = current_score.disregard_utilization(); + + if candidate_score < current_score && current_score.different(&candidate_score) { + tracing::info!("Found a lower scoring location! {candidate_score:?} is better than {current_score:?}"); + } else { + // The candidate node is no better than our current location, so don't migrate + tracing::debug!("Candidate node {candidate_node} is no better than our current location {attached}"); + return None; + } + } + None => { + // The current node is unavailable for scheduling, so we can't make any sensible + // decisions about optimisation. This should be a transient state -- if the node + // is offline then it will get evacuated, if is blocked by a scheduling mode + // then we will respect that mode by doing nothing. + tracing::debug!("Current node {attached} is unavailable for scheduling"); + return None; + } + } + + Some(candidate_node) + } + /// Optimize attachments: if a shard has a secondary location that is preferable to /// its primary location based on soft constraints, switch that secondary location /// to be attached. #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn optimize_attachment( &self, - nodes: &HashMap, + scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { let attached = (*self.intent.get_attached())?; - if self.intent.secondary.is_empty() { - // We can only do useful work if we have both attached and secondary locations: this - // function doesn't schedule new locations, only swaps between attached and secondaries. - return None; - } - let current_affinity_score = schedule_context.get_node_affinity(attached); - let current_attachment_count = schedule_context.get_node_attachments(attached); + let replacement = self.find_better_location(scheduler, schedule_context); - // Generate score for each node, dropping any un-schedulable nodes. - let all_pageservers = self.intent.all_pageservers(); - let mut scores = all_pageservers - .iter() - .flat_map(|node_id| { - let node = nodes.get(node_id); - if node.is_none() { - None - } else if matches!( - node.unwrap().get_scheduling(), - NodeSchedulingPolicy::Filling - ) { - // If the node is currently filling, don't count it as a candidate to avoid, - // racing with the background fill. - None - } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) { - None - } else { - let affinity_score = schedule_context.get_node_affinity(*node_id); - let attachment_count = schedule_context.get_node_attachments(*node_id); - Some((*node_id, affinity_score, attachment_count)) - } - }) - .collect::>(); - - // Sort precedence: - // 1st - prefer nodes with the lowest total affinity score - // 2nd - prefer nodes with the lowest number of attachments in this context - // 3rd - if all else is equal, sort by node ID for determinism in tests. - scores.sort_by_key(|i| (i.1, i.2, i.0)); - - if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) = - scores.first() - { - if attached != *preferred_node { - // The best alternative must be more than 1 better than us, otherwise we could end - // up flapping back next time we're called (e.g. there's no point migrating from - // a location with score 1 to a score zero, because on next location the situation - // would be the same, but in reverse). - if current_affinity_score > *preferred_affinity_score + AffinityScore(1) - || current_attachment_count > *preferred_attachment_count + 1 - { + // We have found a candidate and confirmed that its score is preferable + // to our current location. See if we have a secondary location in the preferred location already: if not, + // then create one. + if let Some(replacement) = replacement { + if !self.intent.get_secondary().contains(&replacement) { + tracing::info!( + "Identified optimization({}): create secondary {replacement}", + self.tenant_shard_id + ); + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::CreateSecondary(replacement), + }) + } else { + // We already have a secondary in the preferred location, let's try migrating to it. Our caller + // will check the warmth of the destination before deciding whether to really execute this. + tracing::info!( + "Identified optimization({}): migrate attachment {attached}->{replacement}", + self.tenant_shard_id + ); + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: attached, + new_attached_node_id: replacement, + }), + }) + } + } else if self.intent.get_secondary().len() > self.policy.want_secondaries() { + // We aren't in the process of migrating anywhere, and we're attached in our preferred AZ. If there are + // any other secondary locations in our preferred AZ, we presume they were created to facilitate a migration + // of the attached location, and remove them. + for secondary in self.intent.get_secondary() { + if scheduler.get_node_az(secondary) == self.preferred_az_id { tracing::info!( - "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})", - self.intent.get_secondary() + "Identified optimization({}): remove secondary {secondary}", + self.tenant_shard_id ); return Some(ScheduleOptimization { sequence: self.sequence, - action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: attached, - new_attached_node_id: *preferred_node, - }), + action: ScheduleOptimizationAction::RemoveSecondary(*secondary), }); } - } else { - tracing::debug!( - "Node {} is already preferred (score {:?})", - preferred_node, - preferred_affinity_score - ); } + + // Fall through: maybe we had excess secondaries in other AZs? Trim them in an arbitrary order + // (lowest Node ID first). + let mut secondary_node_ids = self.intent.get_secondary().clone(); + secondary_node_ids.sort(); + let victim = secondary_node_ids + .first() + .expect("Within block for > check on secondary count"); + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(*victim), + }) + } else { + // We didn't find somewhere we'd rather be, and we don't have any excess secondaries + // to clean up: no action required. + None } - // Fall-through: we didn't find an optimization - None + // TODO: if we find that our current location is optimal, _and_ we also have a secondary + // in the preferred AZ, then clean up that secondary: it was only created to act as a + // migration destination. + // ...maybe do this in optimize_secondary()? } #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] @@ -815,18 +902,18 @@ impl TenantShard { scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { - if self.intent.secondary.is_empty() { - // We can only do useful work if we have both attached and secondary locations: this - // function doesn't schedule new locations, only swaps between attached and secondaries. + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + // We have extra secondaries, perhaps to facilitate a migration of the attached location: + // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done, + // and we are called again, we will proceed. + tracing::debug!("Too many secondaries: skipping"); return None; } for secondary in self.intent.get_secondary() { - let Some(affinity_score) = schedule_context.nodes.get(secondary) else { - // We're already on a node unaffected any affinity constraints, - // so we won't change it. - continue; - }; + let schedule_context = schedule_context.project_secondary_detach(*secondary); + // TODO: fast path to avoid full scheduling calculation if we're in the right AZ and not + // sharing with any other shards in the same tenant // Let the scheduler suggest a node, where it would put us if we were scheduling afresh // This implicitly limits the choice to nodes that are available, and prefers nodes @@ -834,33 +921,63 @@ impl TenantShard { let Ok(candidate_node) = scheduler.schedule_shard::( &self.intent.all_pageservers(), &self.preferred_az_id, - schedule_context, + &schedule_context, ) else { // A scheduling error means we have no possible candidate replacements continue; }; - let candidate_affinity_score = schedule_context - .nodes - .get(&candidate_node) - .unwrap_or(&AffinityScore::FREE); + let Some(candidate_score) = scheduler + .compute_node_score::( + candidate_node, + &self.preferred_az_id, + &schedule_context, + ) + else { + // The candidate node is unavailable for scheduling or otherwise couldn't get a score + // This is unexpected, because schedule() yielded this node + debug_assert!(false); + continue; + }; - // The best alternative must be more than 1 better than us, otherwise we could end - // up flapping back next time we're called. - if *candidate_affinity_score + AffinityScore(1) < *affinity_score { - // If some other node is available and has a lower score than this node, then - // that other node is a good place to migrate to. - tracing::info!( - "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})", - self.intent.get_secondary() - ); - return Some(ScheduleOptimization { - sequence: self.sequence, - action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary { - old_node_id: *secondary, - new_node_id: candidate_node, - }), - }); + match scheduler.compute_node_score::( + *secondary, + &self.preferred_az_id, + &schedule_context, + ) { + Some(current_score) => { + // Disregard utilization: we don't want to thrash around based on disk utilization + let current_score = current_score.disregard_utilization(); + let candidate_score = candidate_score.disregard_utilization(); + + if candidate_score < current_score && current_score.different(&candidate_score) + { + tracing::info!( + "Identified optimization({}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ", + self.tenant_shard_id, + self.intent.get_secondary(), + candidate_score, + current_score + ); + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::ReplaceSecondary( + ReplaceSecondary { + old_node_id: *secondary, + new_node_id: candidate_node, + }, + ), + }); + } + } + None => { + // The current node is unavailable for scheduling, so we can't make any sensible + // decisions about optimisation. This should be a transient state -- if the node + // is offline then it will get evacuated, if is blocked by a scheduling mode + // then we will respect that mode by doing nothing. + tracing::debug!("Current node {secondary} is unavailable for scheduling"); + continue; + } } } @@ -899,6 +1016,12 @@ impl TenantShard { self.intent.remove_secondary(scheduler, old_node_id); self.intent.push_secondary(scheduler, new_node_id); } + ScheduleOptimizationAction::CreateSecondary(new_node_id) => { + self.intent.push_secondary(scheduler, new_node_id); + } + ScheduleOptimizationAction::RemoveSecondary(old_secondary) => { + self.intent.remove_secondary(scheduler, old_secondary); + } } true @@ -1732,7 +1855,8 @@ pub(crate) mod tests { } #[test] - fn optimize_attachment() -> anyhow::Result<()> { + /// Simple case: moving attachment to somewhere better where we already have a secondary + fn optimize_attachment_simple() -> anyhow::Result<()> { let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); @@ -1746,16 +1870,17 @@ pub(crate) mod tests { shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1))); shard_b.intent.push_secondary(&mut scheduler, NodeId(3)); - let mut schedule_context = ScheduleContext::default(); - schedule_context.avoid(&shard_a.intent.all_pageservers()); - schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); - schedule_context.avoid(&shard_b.intent.all_pageservers()); - schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); + fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); + schedule_context.avoid(&shard_b.intent.all_pageservers()); + schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); + schedule_context + } - let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context); - - // Either shard should recognize that it has the option to switch to a secondary location where there - // would be no other shards from the same tenant, and request to do so. + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context); assert_eq!( optimization_a, Some(ScheduleOptimization { @@ -1766,31 +1891,128 @@ pub(crate) mod tests { }) }) ); + shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap()); - // Note that these optimizing two shards in the same tenant with the same ScheduleContext is - // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility - // of [`Service::optimize_all`] to avoid trying - // to do optimizations for multiple shards in the same tenant at the same time. Generating - // both optimizations is just done for test purposes - let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context); + // // Either shard should recognize that it has the option to switch to a secondary location where there + // // would be no other shards from the same tenant, and request to do so. + // assert_eq!( + // optimization_a_prepare, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_migrate, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + // old_attached_node_id: NodeId(1), + // new_attached_node_id: NodeId(2) + // }) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_cleanup, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None); + + shard_a.intent.clear(&mut scheduler); + shard_b.intent.clear(&mut scheduler); + + Ok(()) + } + + #[test] + /// Complicated case: moving attachment to somewhere better where we do not have a secondary + /// already, creating one as needed. + fn optimize_attachment_multistep() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 3, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Two shards of a tenant that wants to be in AZ A + let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + + // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(3)); + shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3))); + shard_b.intent.push_secondary(&mut scheduler, NodeId(2)); + + fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); + schedule_context.avoid(&shard_b.intent.all_pageservers()); + schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); + schedule_context + } + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context); assert_eq!( - optimization_b, + optimization_a_prepare, Some(ScheduleOptimization { - sequence: shard_b.sequence, + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(1)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shard_a.sequence, action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: NodeId(1), - new_attached_node_id: NodeId(3) + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) }) }) ); + shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); - // Applying these optimizations should result in the end state proposed - shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap()); - assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2))); - assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]); - shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap()); - assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3))); - assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]); + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None); shard_a.intent.clear(&mut scheduler); shard_b.intent.clear(&mut scheduler); @@ -1848,7 +2070,6 @@ pub(crate) mod tests { // called repeatedly in the background. // Returns the applied optimizations fn optimize_til_idle( - nodes: &HashMap, scheduler: &mut Scheduler, shards: &mut [TenantShard], ) -> Vec { @@ -1866,7 +2087,7 @@ pub(crate) mod tests { } for shard in shards.iter_mut() { - let optimization = shard.optimize_attachment(nodes, &schedule_context); + let optimization = shard.optimize_attachment(scheduler, &schedule_context); if let Some(optimization) = optimization { optimizations.push(optimization.clone()); shard.apply_optimization(scheduler, optimization); @@ -1895,18 +2116,40 @@ pub(crate) mod tests { optimizations } + use test_log::test; + /// Test the balancing behavior of shard scheduling: that it achieves a balance, and /// that it converges. #[test] fn optimize_add_nodes() -> anyhow::Result<()> { - let nodes = make_test_nodes(4, &[]); + let nodes = make_test_nodes( + 9, + &[ + // Initial 6 nodes + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + AvailabilityZone("az-c".to_string()), + // Three we will add later + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); - // Only show the scheduler a couple of nodes + // Only show the scheduler two nodes in each AZ to start with let mut scheduler = Scheduler::new([].iter()); - scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); - scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); + for i in 1..=6 { + scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap()); + } - let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); + let mut shards = make_test_tenant( + PlacementPolicy::Attached(1), + ShardCount::new(4), + Some(AvailabilityZone("az-a".to_string())), + ); let mut schedule_context = ScheduleContext::default(); for shard in &mut shards { assert!(shard @@ -1914,30 +2157,50 @@ pub(crate) mod tests { .is_ok()); } - // We should see equal number of locations on the two nodes. - assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4); + // Initial: attached locations land in the tenant's home AZ. + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2); - - assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4); + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2); - // Add another two nodes: we should see the shards spread out when their optimize - // methods are called - scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap()); - scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap()); - optimize_til_idle(&nodes, &mut scheduler, &mut shards); + // Initial: secondary locations in a remote AZ + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0); - assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); + // Add another three nodes: we should see the shards spread out when their optimize + // methods are called + scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap()); + scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap()); + scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap()); + optimize_til_idle(&mut scheduler, &mut shards); + + // We expect one attached location was moved to the new node in the tenant's home AZ + assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1); + // The original node has one less attached shard + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1); + // One of the original nodes still has two attachments, since there are an odd number of nodes assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2); - assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1); - - assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1); + // None of our secondaries moved, since we already had enough nodes for those to be + // scheduled perfectly + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0); for shard in shards.iter_mut() { shard.intent.clear(&mut scheduler); @@ -1977,10 +2240,10 @@ pub(crate) mod tests { shard.schedule(&mut scheduler, context).unwrap(); } - let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a); + let applied_to_a = optimize_til_idle(&mut scheduler, &mut a); assert_eq!(applied_to_a, vec![]); - let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b); + let applied_to_b = optimize_til_idle(&mut scheduler, &mut b); assert_eq!(applied_to_b, vec![]); for shard in a.iter_mut().chain(b.iter_mut()) {