diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index ba0eb0e4ae..f3aefc6df9 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -367,6 +367,16 @@ pub enum PlacementPolicy { Detached, } +impl PlacementPolicy { + pub fn want_secondaries(&self) -> usize { + match self { + PlacementPolicy::Attached(secondary_count) => *secondary_count, + PlacementPolicy::Secondary => 1, + PlacementPolicy::Detached => 0, + } + } +} + #[derive(Serialize, Deserialize, Debug)] pub struct TenantShardMigrateResponse {} diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 5f3319512d..caaa22d0a5 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -55,4 +55,4 @@ r2d2 = { version = "0.8.10" } utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } control_plane = { path = "../control_plane" } -workspace_hack = { version = "0.1", path = "../workspace_hack" } +workspace_hack = { version = "0.1", path = "../workspace_hack" } \ No newline at end of file diff --git a/storage_controller/src/drain_utils.rs b/storage_controller/src/drain_utils.rs index 47f4276ff2..8b7be88078 100644 --- a/storage_controller/src/drain_utils.rs +++ b/storage_controller/src/drain_utils.rs @@ -112,7 +112,7 @@ impl TenantShardDrain { } } - match scheduler.node_preferred(tenant_shard.intent.get_secondary()) { + match tenant_shard.preferred_secondary(scheduler) { Some(node) => Some(node), None => { tracing::warn!( diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 6f584e7267..adced3b77d 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -826,7 +826,21 @@ impl Reconciler { if self.cancel.is_cancelled() { return Err(ReconcileError::Cancel); } - self.location_config(&node, conf, None, false).await?; + // We only try to configure secondary locations if the node is available. This does + // not stop us succeeding with the reconcile, because our core goal is to make the + // shard _available_ (the attached location), and configuring secondary locations + // can be done lazily when the node becomes available (via background reconciliation). + if node.is_available() { + self.location_config(&node, conf, None, false).await?; + } else { + // If the node is unavailable, we skip and consider the reconciliation successful: this + // is a common case where a pageserver is marked unavailable: we demote a location on + // that unavailable pageserver to secondary. + tracing::info!("Skipping configuring secondary location {node}, it is unavailable"); + self.observed + .locations + .insert(node.get_id(), ObservedStateLocation { conf: None }); + } } // The condition below identifies a detach. We must have no attached intent and diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 51a4cf35be..04a594dcac 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -32,6 +32,9 @@ pub(crate) struct SchedulerNode { shard_count: usize, /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. attached_shard_count: usize, + /// How many shards have a location on this node (via [`crate::tenant_shard::IntentState`]) _and_ this node + /// is in their preferred AZ (i.e. this is their 'home' location) + home_shard_count: usize, /// Availability zone id in which the node resides az: AvailabilityZone, @@ -47,6 +50,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { preferred_az: &Option, context: &ScheduleContext, ) -> Option; + + /// Return a score that drops any components based on node utilization: this is useful + /// for finding scores for scheduling optimisation, when we want to avoid rescheduling + /// shards due to e.g. disk usage, to avoid flapping. + fn for_optimization(&self) -> Self; + fn is_overloaded(&self) -> bool; fn node_id(&self) -> NodeId; } @@ -136,17 +145,13 @@ impl PartialOrd for SecondaryAzMatch { /// Ordering is given by member declaration order (top to bottom). #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub(crate) struct NodeAttachmentSchedulingScore { - /// The number of shards belonging to the tenant currently being - /// scheduled that are attached to this node. - affinity_score: AffinityScore, /// Flag indicating whether this node matches the preferred AZ /// of the shard. For equal affinity scores, nodes in the matching AZ /// are considered first. az_match: AttachmentAzMatch, - /// Size of [`ScheduleContext::attached_nodes`] for the current node. - /// This normally tracks the number of attached shards belonging to the - /// tenant being scheduled that are already on this node. - attached_shards_in_context: usize, + /// The number of shards belonging to the tenant currently being + /// scheduled that are attached to this node. + affinity_score: AffinityScore, /// Utilisation score that combines shard count and disk utilisation utilization_score: u64, /// Total number of shards attached to this node. When nodes have identical utilisation, this @@ -177,13 +182,25 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { .copied() .unwrap_or(AffinityScore::FREE), az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), - attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0), utilization_score: utilization.cached_score(), total_attached_shard_count: node.attached_shard_count, node_id: *node_id, }) } + /// For use in scheduling optimisation, where we only want to consider the aspects + /// of the score that can only be resolved by moving things (such as inter-shard affinity + /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which + /// can fluctuate for other reasons) + fn for_optimization(&self) -> Self { + Self { + utilization_score: 0, + total_attached_shard_count: 0, + node_id: NodeId(0), + ..*self + } + } + fn is_overloaded(&self) -> bool { PageserverUtilization::is_overloaded(self.utilization_score) } @@ -208,9 +225,9 @@ pub(crate) struct NodeSecondarySchedulingScore { affinity_score: AffinityScore, /// Utilisation score that combines shard count and disk utilisation utilization_score: u64, - /// Total number of shards attached to this node. When nodes have identical utilisation, this - /// acts as an anti-affinity between attached shards. - total_attached_shard_count: usize, + /// Anti-affinity with other non-home locations: this gives the behavior that secondaries + /// will spread out across the nodes in an AZ. + total_non_home_shard_count: usize, /// Convenience to make selection deterministic in tests and empty systems node_id: NodeId, } @@ -237,11 +254,20 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { .copied() .unwrap_or(AffinityScore::FREE), utilization_score: utilization.cached_score(), - total_attached_shard_count: node.attached_shard_count, + total_non_home_shard_count: (node.shard_count - node.home_shard_count), node_id: *node_id, }) } + fn for_optimization(&self) -> Self { + Self { + utilization_score: 0, + total_non_home_shard_count: 0, + node_id: NodeId(0), + ..*self + } + } + fn is_overloaded(&self) -> bool { PageserverUtilization::is_overloaded(self.utilization_score) } @@ -293,6 +319,10 @@ impl AffinityScore { pub(crate) fn inc(&mut self) { self.0 += 1; } + + pub(crate) fn dec(&mut self) { + self.0 -= 1; + } } impl std::ops::Add for AffinityScore { @@ -324,9 +354,6 @@ pub(crate) struct ScheduleContext { /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`] pub(crate) nodes: HashMap, - /// Specifically how many _attached_ locations are on each node - pub(crate) attached_nodes: HashMap, - pub(crate) mode: ScheduleMode, } @@ -334,7 +361,6 @@ impl ScheduleContext { pub(crate) fn new(mode: ScheduleMode) -> Self { Self { nodes: HashMap::new(), - attached_nodes: HashMap::new(), mode, } } @@ -348,25 +374,31 @@ impl ScheduleContext { } } - pub(crate) fn push_attached(&mut self, node_id: NodeId) { - let entry = self.attached_nodes.entry(node_id).or_default(); - *entry += 1; - } - - pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore { - self.nodes - .get(&node_id) - .copied() - .unwrap_or(AffinityScore::FREE) - } - - pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize { - self.attached_nodes.get(&node_id).copied().unwrap_or(0) + /// Remove `shard`'s contributions to this context. This is useful when considering scheduling + /// this shard afresh, where we don't want it to e.g. experience anti-affinity to its current location. + pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self { + let mut new_context = self.clone(); + + if let Some(attached) = shard.intent.get_attached() { + if let Some(score) = new_context.nodes.get_mut(attached) { + score.dec(); + } + } + + for secondary in shard.intent.get_secondary() { + if let Some(score) = new_context.nodes.get_mut(secondary) { + score.dec(); + } + } + + new_context } + /// For test, track the sum of AffinityScore values, which is effectively how many + /// attached or secondary locations have been registered with this context. #[cfg(test)] - pub(crate) fn attach_count(&self) -> usize { - self.attached_nodes.values().sum() + pub(crate) fn location_count(&self) -> usize { + self.nodes.values().map(|i| i.0).sum() } } @@ -388,6 +420,7 @@ impl Scheduler { SchedulerNode { shard_count: 0, attached_shard_count: 0, + home_shard_count: 0, may_schedule: node.may_schedule(), az: node.get_availability_zone_id().clone(), }, @@ -415,6 +448,7 @@ impl Scheduler { SchedulerNode { shard_count: 0, attached_shard_count: 0, + home_shard_count: 0, may_schedule: node.may_schedule(), az: node.get_availability_zone_id().clone(), }, @@ -427,6 +461,9 @@ impl Scheduler { Some(node) => { node.shard_count += 1; node.attached_shard_count += 1; + if Some(&node.az) == shard.preferred_az() { + node.home_shard_count += 1; + } } None => anyhow::bail!( "Tenant {} references nonexistent node {}", @@ -438,7 +475,12 @@ impl Scheduler { for node_id in shard.intent.get_secondary() { match expect_nodes.get_mut(node_id) { - Some(node) => node.shard_count += 1, + Some(node) => { + node.shard_count += 1; + if Some(&node.az) == shard.preferred_az() { + node.home_shard_count += 1; + } + } None => anyhow::bail!( "Tenant {} references nonexistent node {}", shard.tenant_shard_id, @@ -482,13 +524,20 @@ impl Scheduler { /// /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into /// [`Self::new`] or [`Self::node_upsert`]) - pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) { + pub(crate) fn update_node_ref_counts( + &mut self, + node_id: NodeId, + preferred_az: Option<&AvailabilityZone>, + update: RefCountUpdate, + ) { let Some(node) = self.nodes.get_mut(&node_id) else { debug_assert!(false); tracing::error!("Scheduler missing node {node_id}"); return; }; + let is_home_az = Some(&node.az) == preferred_az; + match update { RefCountUpdate::PromoteSecondary => { node.attached_shard_count += 1; @@ -496,19 +545,31 @@ impl Scheduler { RefCountUpdate::Attach => { node.shard_count += 1; node.attached_shard_count += 1; + if is_home_az { + node.home_shard_count += 1; + } } RefCountUpdate::Detach => { node.shard_count -= 1; node.attached_shard_count -= 1; + if is_home_az { + node.home_shard_count -= 1; + } } RefCountUpdate::DemoteAttached => { node.attached_shard_count -= 1; } RefCountUpdate::AddSecondary => { node.shard_count += 1; + if is_home_az { + node.home_shard_count += 1; + } } RefCountUpdate::RemoveSecondary => { node.shard_count -= 1; + if is_home_az { + node.home_shard_count -= 1; + } } } @@ -594,6 +655,7 @@ impl Scheduler { entry.insert(SchedulerNode { shard_count: 0, attached_shard_count: 0, + home_shard_count: 0, may_schedule: node.may_schedule(), az: node.get_availability_zone_id().clone(), }); @@ -607,33 +669,20 @@ impl Scheduler { } } - /// Where we have several nodes to choose from, for example when picking a secondary location - /// to promote to an attached location, this method may be used to pick the best choice based - /// on the scheduler's knowledge of utilization and availability. - /// - /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the - /// caller can pick a node some other way. - pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option { - if nodes.is_empty() { - return None; - } - - // TODO: When the utilization score returned by the pageserver becomes meaningful, - // schedule based on that instead of the shard count. - let node = nodes - .iter() - .map(|node_id| { - let may_schedule = self - .nodes - .get(node_id) - .map(|n| !matches!(n.may_schedule, MaySchedule::No)) - .unwrap_or(false); - (*node_id, may_schedule) - }) - .max_by_key(|(_n, may_schedule)| *may_schedule); - - // If even the preferred node has may_schedule==false, return None - node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None }) + /// Calculate a single node's score, used in optimizer logic to compare specific + /// nodes' scores. + pub(crate) fn compute_node_score( + &mut self, + node_id: NodeId, + preferred_az: &Option, + context: &ScheduleContext, + ) -> Option + where + Score: NodeSchedulingScore, + { + self.nodes + .get_mut(&node_id) + .and_then(|node| Score::generate(&node_id, node, preferred_az, context)) } /// Compute a schedulling score for each node that the scheduler knows of @@ -727,7 +776,7 @@ impl Scheduler { tracing::info!( "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})", scores.iter().map(|i| i.node_id().0).collect::>() - ); + ); } // Note that we do not update shard count here to reflect the scheduling: that @@ -743,47 +792,74 @@ impl Scheduler { } /// For choosing which AZ to schedule a new shard into, use this. It will return the - /// AZ with the lowest median utilization. + /// AZ with the the lowest number of shards currently scheduled in this AZ as their home + /// location. /// /// We use an AZ-wide measure rather than simply selecting the AZ of the least-loaded /// node, because while tenants start out single sharded, when they grow and undergo - /// shard-split, they will occupy space on many nodes within an AZ. + /// shard-split, they will occupy space on many nodes within an AZ. It is important + /// that we pick the AZ in a way that balances this _future_ load. /// - /// We use median rather than total free space or mean utilization, because - /// we wish to avoid preferring AZs that have low-load nodes resulting from - /// recent replacements. - /// - /// The practical result is that we will pick an AZ based on its median node, and - /// then actually _schedule_ the new shard onto the lowest-loaded node in that AZ. + /// Once we've picked an AZ, subsequent scheduling within that AZ will be driven by + /// nodes' utilization scores. pub(crate) fn get_az_for_new_tenant(&self) -> Option { if self.nodes.is_empty() { return None; } - let mut scores_by_az = HashMap::new(); - for (node_id, node) in &self.nodes { - let az_scores = scores_by_az.entry(&node.az).or_insert_with(Vec::new); - let score = match &node.may_schedule { - MaySchedule::Yes(utilization) => utilization.score(), - MaySchedule::No => PageserverUtilization::full().score(), - }; - az_scores.push((node_id, node, score)); + #[derive(Default)] + struct AzScore { + home_shard_count: usize, + scheduleable: bool, } - // Sort by utilization. Also include the node ID to break ties. - for scores in scores_by_az.values_mut() { - scores.sort_by_key(|i| (i.2, i.0)); + let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new(); + for node in self.nodes.values() { + let az = azs.entry(&node.az).or_default(); + az.home_shard_count += node.home_shard_count; + az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_)); } - let mut median_by_az = scores_by_az + // If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where + // all nodes are overloaded or otherwise unschedulable). + if azs.values().any(|i| i.scheduleable) { + azs.retain(|_, i| i.scheduleable); + } + + // Find the AZ with the lowest number of shards currently allocated + Some( + azs.into_iter() + .min_by_key(|i| (i.1.home_shard_count, i.0)) + .unwrap() + .0 + .clone(), + ) + } + + pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option { + self.nodes.get(node_id).map(|n| n.az.clone()) + } + + /// For use when choosing a preferred secondary location: filter out nodes that are not + /// available, and gather their AZs. + pub(crate) fn filter_usable_nodes( + &self, + nodes: &[NodeId], + ) -> Vec<(NodeId, Option)> { + nodes .iter() - .map(|(az, nodes)| (*az, nodes.get(nodes.len() / 2).unwrap().2)) - .collect::>(); - // Sort by utilization. Also include the AZ to break ties. - median_by_az.sort_by_key(|i| (i.1, i.0)); - - // Return the AZ with the lowest median utilization - Some(median_by_az.first().unwrap().0.clone()) + .filter_map(|node_id| { + let node = self + .nodes + .get(node_id) + .expect("Referenced nodes always exist"); + if matches!(node.may_schedule, MaySchedule::Yes(_)) { + Some((*node_id, Some(node.az.clone()))) + } else { + None + } + }) + .collect() } /// Unit test access to internal state @@ -843,7 +919,14 @@ pub(crate) mod test_utils { #[cfg(test)] mod tests { - use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; + use pageserver_api::{ + controller_api::NodeAvailability, models::utilization::test_utilization, + shard::ShardIdentity, + }; + use utils::{ + id::TenantId, + shard::{ShardCount, ShardNumber, TenantShardId}, + }; use super::*; @@ -853,8 +936,8 @@ mod tests { let nodes = test_utils::make_test_nodes(2, &[]); let mut scheduler = Scheduler::new(nodes.values()); - let mut t1_intent = IntentState::new(); - let mut t2_intent = IntentState::new(); + let mut t1_intent = IntentState::new(None); + let mut t2_intent = IntentState::new(None); let context = ScheduleContext::default(); @@ -930,7 +1013,7 @@ mod tests { let scheduled = scheduler .schedule_shard::(&[], &None, context) .unwrap(); - let mut intent = IntentState::new(); + let mut intent = IntentState::new(None); intent.set_attached(scheduler, Some(scheduled)); scheduled_intents.push(intent); assert_eq!(scheduled, expect_node); @@ -1063,7 +1146,7 @@ mod tests { let scheduled = scheduler .schedule_shard::(&[], &preferred_az, context) .unwrap(); - let mut intent = IntentState::new(); + let mut intent = IntentState::new(preferred_az.clone()); intent.set_attached(scheduler, Some(scheduled)); scheduled_intents.push(intent); assert_eq!(scheduled, expect_node); @@ -1089,9 +1172,9 @@ mod tests { &mut context, ); - // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that. + // Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id. assert_scheduler_chooses::( - NodeId(2), + NodeId(1), Some(az_a_tag.clone()), &mut scheduled_intents, &mut scheduler, @@ -1107,26 +1190,6 @@ mod tests { &mut context, ); - // Avoid nodes in "az-b" for the secondary location. - // Nodes 1 and 3 are identically loaded, so prefer the lowest node id. - assert_scheduler_chooses::( - NodeId(1), - Some(az_b_tag.clone()), - &mut scheduled_intents, - &mut scheduler, - &mut context, - ); - - // Avoid nodes in "az-b" for the secondary location. - // Node 3 has lower affinity score than 1, so prefer that. - assert_scheduler_chooses::( - NodeId(3), - Some(az_b_tag.clone()), - &mut scheduled_intents, - &mut scheduler, - &mut context, - ); - for mut intent in scheduled_intents { intent.clear(&mut scheduler); } @@ -1150,34 +1213,292 @@ mod tests { let mut scheduler = Scheduler::new(nodes.values()); - /// Force the utilization of a node in Scheduler's state to a particular - /// number of bytes used. - fn set_utilization(scheduler: &mut Scheduler, node_id: NodeId, shard_count: u32) { - let mut node = Node::new( - node_id, - "".to_string(), - 0, - "".to_string(), - 0, - scheduler.nodes.get(&node_id).unwrap().az.clone(), - ); - node.set_availability(NodeAvailability::Active(test_utilization::simple( - shard_count, - 0, - ))); - scheduler.node_upsert(&node); + /// Force the `home_shard_count` of a node directly: this is the metric used + /// by the scheduler when picking AZs. + fn set_shard_count(scheduler: &mut Scheduler, node_id: NodeId, shard_count: usize) { + let node = scheduler.nodes.get_mut(&node_id).unwrap(); + node.home_shard_count = shard_count; } // Initial empty state. Scores are tied, scheduler prefers lower AZ ID. assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); - // Put some utilization on one node in AZ A: this should change nothing, as the median hasn't changed - set_utilization(&mut scheduler, NodeId(1), 1000000); - assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); - - // Put some utilization on a second node in AZ A: now the median has changed, so the scheduler - // should prefer the other AZ. - set_utilization(&mut scheduler, NodeId(2), 1000000); + // Home shard count is higher in AZ A, so AZ B will be preferred + set_shard_count(&mut scheduler, NodeId(1), 10); assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_b_tag.clone())); + + // Total home shard count is higher in AZ B, so we revert to preferring AZ A + set_shard_count(&mut scheduler, NodeId(4), 6); + set_shard_count(&mut scheduler, NodeId(5), 6); + assert_eq!(scheduler.get_az_for_new_tenant(), Some(az_a_tag.clone())); + } + + /// Test that when selecting AZs for many new tenants, we get the expected balance across nodes + #[test] + fn az_selection_many() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let az_c_tag = AvailabilityZone("az-c".to_string()); + let nodes = test_utils::make_test_nodes( + 6, + &[ + az_a_tag.clone(), + az_b_tag.clone(), + az_c_tag.clone(), + az_a_tag.clone(), + az_b_tag.clone(), + az_c_tag.clone(), + ], + ); + + let mut scheduler = Scheduler::new(nodes.values()); + + // We should get 1/6th of these on each node, give or take a few... + let total_tenants = 300; + + // ...where the 'few' is the number of AZs, because the scheduling will sometimes overshoot + // on one AZ before correcting itself. This is because we select the 'home' AZ based on + // an AZ-wide metric, but we select the location for secondaries on a purely node-based + // metric (while excluding the home AZ). + let grace = 3; + + let mut scheduled_shards = Vec::new(); + for _i in 0..total_tenants { + let preferred_az = scheduler.get_az_for_new_tenant().unwrap(); + + let mut node_home_counts = scheduler + .nodes + .iter() + .map(|(node_id, node)| (node_id, node.home_shard_count)) + .collect::>(); + node_home_counts.sort_by_key(|i| i.0); + eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts); + + let tenant_shard_id = TenantShardId { + tenant_id: TenantId::generate(), + shard_number: ShardNumber(0), + shard_count: ShardCount(1), + }; + + let shard_identity = ShardIdentity::new( + tenant_shard_id.shard_number, + tenant_shard_id.shard_count, + pageserver_api::shard::ShardStripeSize(1), + ) + .unwrap(); + let mut shard = TenantShard::new( + tenant_shard_id, + shard_identity, + pageserver_api::controller_api::PlacementPolicy::Attached(1), + Some(preferred_az), + ); + + let mut context = ScheduleContext::default(); + shard.schedule(&mut scheduler, &mut context).unwrap(); + eprintln!("Scheduled shard at {:?}", shard.intent); + + scheduled_shards.push(shard); + } + + for (node_id, node) in &scheduler.nodes { + eprintln!( + "Node {}: {} {} {}", + node_id, node.shard_count, node.attached_shard_count, node.home_shard_count + ); + } + + for node in scheduler.nodes.values() { + assert!((node.home_shard_count as i64 - total_tenants as i64 / 6).abs() < grace); + } + + for mut shard in scheduled_shards { + shard.intent.clear(&mut scheduler); + } + } + + #[test] + /// Make sure that when we have an odd number of nodes and an even number of shards, we still + /// get scheduling stability. + fn odd_nodes_stability() { + let az_a = AvailabilityZone("az-a".to_string()); + let az_b = AvailabilityZone("az-b".to_string()); + + let nodes = test_utils::make_test_nodes( + 10, + &[ + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Need to keep these alive because they contribute to shard counts via RAII + let mut scheduled_shards = Vec::new(); + + let mut context = ScheduleContext::default(); + + fn schedule_shard( + tenant_shard_id: TenantShardId, + expect_attached: NodeId, + expect_secondary: NodeId, + scheduled_shards: &mut Vec, + scheduler: &mut Scheduler, + preferred_az: Option, + context: &mut ScheduleContext, + ) { + let shard_identity = ShardIdentity::new( + tenant_shard_id.shard_number, + tenant_shard_id.shard_count, + pageserver_api::shard::ShardStripeSize(1), + ) + .unwrap(); + let mut shard = TenantShard::new( + tenant_shard_id, + shard_identity, + pageserver_api::controller_api::PlacementPolicy::Attached(1), + preferred_az, + ); + + shard.schedule(scheduler, context).unwrap(); + + assert_eq!(shard.intent.get_attached().unwrap(), expect_attached); + assert_eq!( + shard.intent.get_secondary().first().unwrap(), + &expect_secondary + ); + + scheduled_shards.push(shard); + } + + let tenant_id = TenantId::generate(); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(0), + shard_count: ShardCount(8), + }, + NodeId(1), + NodeId(6), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(1), + shard_count: ShardCount(8), + }, + NodeId(2), + NodeId(7), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(2), + shard_count: ShardCount(8), + }, + NodeId(3), + NodeId(8), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(3), + shard_count: ShardCount(8), + }, + NodeId(4), + NodeId(9), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(4), + shard_count: ShardCount(8), + }, + NodeId(5), + NodeId(10), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(5), + shard_count: ShardCount(8), + }, + NodeId(1), + NodeId(6), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(6), + shard_count: ShardCount(8), + }, + NodeId(2), + NodeId(7), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + schedule_shard( + TenantShardId { + tenant_id, + shard_number: ShardNumber(7), + shard_count: ShardCount(8), + }, + NodeId(3), + NodeId(8), + &mut scheduled_shards, + &mut scheduler, + Some(az_a.clone()), + &mut context, + ); + + // Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable. + for shard in &scheduled_shards { + assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None); + } + + for mut shard in scheduled_shards { + shard.intent.clear(&mut scheduler); + } } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 8aa263f0c3..dadcc44cfb 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1404,7 +1404,11 @@ impl Service { // We will populate intent properly later in [`Self::startup_reconcile`], initially populate // it with what we can infer: the node for which a generation was most recently issued. - let mut intent = IntentState::new(); + let mut intent = IntentState::new( + tsp.preferred_az_id + .as_ref() + .map(|az| AvailabilityZone(az.clone())), + ); if let Some(generation_pageserver) = tsp.generation_pageserver.map(|n| NodeId(n as u64)) { if nodes.contains_key(&generation_pageserver) { @@ -2474,18 +2478,29 @@ impl Service { tenant_id: TenantId, _guard: &TracingExclusiveGuard, ) -> Result<(), ApiError> { - let present_in_memory = { + // Check if the tenant is present in memory, and select an AZ to use when loading + // if we will load it. + let load_in_az = { let locked = self.inner.read().unwrap(); - locked + let existing = locked .tenants .range(TenantShardId::tenant_range(tenant_id)) - .next() - .is_some() - }; + .next(); - if present_in_memory { - return Ok(()); - } + // If the tenant is not present in memory, we expect to load it from database, + // so let's figure out what AZ to load it into while we have self.inner locked. + if existing.is_none() { + locked + .scheduler + .get_az_for_new_tenant() + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "No AZ with nodes found to load tenant" + )))? + } else { + // We already have this tenant in memory + return Ok(()); + } + }; let tenant_shards = self.persistence.load_tenant(tenant_id).await?; if tenant_shards.is_empty() { @@ -2494,8 +2509,20 @@ impl Service { )); } - // TODO: choose a fresh AZ to use for this tenant when un-detaching: there definitely isn't a running - // compute, so no benefit to making AZ sticky across detaches. + // Update the persistent shards with the AZ that we are about to apply to in-memory state + self.persistence + .set_tenant_shard_preferred_azs( + tenant_shards + .iter() + .map(|t| { + ( + t.get_tenant_shard_id().expect("Corrupt shard in database"), + load_in_az.clone(), + ) + }) + .collect(), + ) + .await?; let mut locked = self.inner.write().unwrap(); tracing::info!( @@ -2505,7 +2532,7 @@ impl Service { ); locked.tenants.extend(tenant_shards.into_iter().map(|p| { - let intent = IntentState::new(); + let intent = IntentState::new(Some(load_in_az.clone())); let shard = TenantShard::from_persistent(p, intent).expect("Corrupt shard row in database"); @@ -4236,6 +4263,22 @@ impl Service { } tracing::info!("Restoring parent shard {tenant_shard_id}"); + + // Drop any intents that refer to unavailable nodes, to enable this abort to proceed even + // if the original attachment location is offline. + if let Some(node_id) = shard.intent.get_attached() { + if !nodes.get(node_id).unwrap().is_available() { + tracing::info!("Demoting attached intent for {tenant_shard_id} on unavailable node {node_id}"); + shard.intent.demote_attached(scheduler, *node_id); + } + } + for node_id in shard.intent.get_secondary().clone() { + if !nodes.get(&node_id).unwrap().is_available() { + tracing::info!("Dropping secondary intent for {tenant_shard_id} on unavailable node {node_id}"); + shard.intent.remove_secondary(scheduler, node_id); + } + } + shard.splitting = SplitState::Idle; if let Err(e) = shard.schedule(scheduler, &mut ScheduleContext::default()) { // If this shard can't be scheduled now (perhaps due to offline nodes or @@ -4389,15 +4432,13 @@ impl Service { let mut child_state = TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone()); - child_state.intent = IntentState::single(scheduler, Some(pageserver)); + child_state.intent = + IntentState::single(scheduler, Some(pageserver), preferred_az.clone()); child_state.observed = ObservedState { locations: child_observed, }; child_state.generation = Some(generation); child_state.config = config.clone(); - if let Some(preferred_az) = &preferred_az { - child_state.set_preferred_az(preferred_az.clone()); - } // The child's TenantShard::splitting is intentionally left at the default value of Idle, // as at this point in the split process we have succeeded and this part is infallible: @@ -5014,6 +5055,8 @@ impl Service { // If our new attached node was a secondary, it no longer should be. shard.intent.remove_secondary(scheduler, migrate_req.node_id); + shard.intent.set_attached(scheduler, Some(migrate_req.node_id)); + // If we were already attached to something, demote that to a secondary if let Some(old_attached) = old_attached { if n > 0 { @@ -5025,8 +5068,6 @@ impl Service { shard.intent.push_secondary(scheduler, old_attached); } } - - shard.intent.set_attached(scheduler, Some(migrate_req.node_id)); } PlacementPolicy::Secondary => { shard.intent.clear(scheduler); @@ -5712,7 +5753,7 @@ impl Service { register_req.listen_http_port, register_req.listen_pg_addr, register_req.listen_pg_port, - register_req.availability_zone_id, + register_req.availability_zone_id.clone(), ); // TODO: idempotency if the node already exists in the database @@ -5732,8 +5773,9 @@ impl Service { .set(locked.nodes.len() as i64); tracing::info!( - "Registered pageserver {}, now have {} pageservers", + "Registered pageserver {} ({}), now have {} pageservers", register_req.node_id, + register_req.availability_zone_id, locked.nodes.len() ); Ok(()) @@ -6467,6 +6509,7 @@ impl Service { // Shard was dropped between planning and execution; continue; }; + tracing::info!("Applying optimization: {optimization:?}"); if shard.apply_optimization(scheduler, optimization) { optimizations_applied += 1; if self.maybe_reconcile_shard(shard, nodes).is_some() { @@ -6497,7 +6540,13 @@ impl Service { let mut work = Vec::new(); let mut locked = self.inner.write().unwrap(); - let (nodes, tenants, scheduler) = locked.parts_mut(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + + // We are going to plan a bunch of optimisations before applying any of them, so the + // utilisation stats on nodes will be effectively stale for the >1st optimisation we + // generate. To avoid this causing unstable migrations/flapping, it's important that the + // code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`] + // to ignore the utilisation component of the score. for (_tenant_id, schedule_context, shards) in TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) @@ -6528,13 +6577,28 @@ impl Service { continue; } - // TODO: optimization calculations are relatively expensive: create some fast-path for - // the common idle case (avoiding the search on tenants that we have recently checked) + // Fast path: we may quickly identify shards that don't have any possible optimisations + if !shard.maybe_optimizable(scheduler, &schedule_context) { + if cfg!(feature = "testing") { + // Check that maybe_optimizable doesn't disagree with the actual optimization functions. + // Only do this in testing builds because it is not a correctness-critical check, so we shouldn't + // panic in prod if we hit this, or spend cycles on it in prod. + assert!(shard + .optimize_attachment(scheduler, &schedule_context) + .is_none()); + assert!(shard + .optimize_secondary(scheduler, &schedule_context) + .is_none()); + } + continue; + } + if let Some(optimization) = - // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to + // If idle, maybe optimize attachments: if a shard has a secondary location that is preferable to // its primary location based on soft constraints, cut it over. - shard.optimize_attachment(nodes, &schedule_context) + shard.optimize_attachment(scheduler, &schedule_context) { + tracing::info!(tenant_shard_id=%shard.tenant_shard_id, "Identified optimization for attachment: {optimization:?}"); work.push((shard.tenant_shard_id, optimization)); break; } else if let Some(optimization) = @@ -6544,6 +6608,7 @@ impl Service { // in the same tenant with secondary locations on the node where they originally split. shard.optimize_secondary(scheduler, &schedule_context) { + tracing::info!(tenant_shard_id=%shard.tenant_shard_id, "Identified optimization for secondary: {optimization:?}"); work.push((shard.tenant_shard_id, optimization)); break; } @@ -6592,8 +6657,10 @@ impl Service { } } } - ScheduleOptimizationAction::ReplaceSecondary(_) => { - // No extra checks needed to replace a secondary: this does not interrupt client access + ScheduleOptimizationAction::ReplaceSecondary(_) + | ScheduleOptimizationAction::CreateSecondary(_) + | ScheduleOptimizationAction::RemoveSecondary(_) => { + // No extra checks needed to manage secondaries: this does not interrupt client access validated_work.push((tenant_shard_id, optimization)) } }; @@ -6665,26 +6732,35 @@ impl Service { /// we have this helper to move things along faster. #[cfg(feature = "testing")] async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) { - let (attached_node, secondary_node) = { + let (attached_node, secondaries) = { let locked = self.inner.read().unwrap(); let Some(shard) = locked.tenants.get(&tenant_shard_id) else { + tracing::warn!( + "Skipping kick of secondary download for {tenant_shard_id}: not found" + ); return; }; - let (Some(attached), Some(secondary)) = ( - shard.intent.get_attached(), - shard.intent.get_secondary().first(), - ) else { + + let Some(attached) = shard.intent.get_attached() else { + tracing::warn!( + "Skipping kick of secondary download for {tenant_shard_id}: no attached" + ); return; }; - ( - locked.nodes.get(attached).unwrap().clone(), - locked.nodes.get(secondary).unwrap().clone(), - ) + + let secondaries = shard + .intent + .get_secondary() + .iter() + .map(|n| locked.nodes.get(n).unwrap().clone()) + .collect::>(); + + (locked.nodes.get(attached).unwrap().clone(), secondaries) }; // Make remote API calls to upload + download heatmaps: we ignore errors because this is just // a 'kick' to let scheduling optimisation run more promptly. - attached_node + match attached_node .with_client_retries( |client| async move { client.tenant_heatmap_upload(tenant_shard_id).await }, &self.config.jwt_token, @@ -6693,22 +6769,57 @@ impl Service { SHORT_RECONCILE_TIMEOUT, &self.cancel, ) - .await; + .await + { + Some(Err(e)) => { + tracing::info!( + "Failed to upload heatmap from {attached_node} for {tenant_shard_id}: {e}" + ); + } + None => { + tracing::info!( + "Cancelled while uploading heatmap from {attached_node} for {tenant_shard_id}" + ); + } + Some(Ok(_)) => { + tracing::info!( + "Successfully uploaded heatmap from {attached_node} for {tenant_shard_id}" + ); + } + } - secondary_node - .with_client_retries( - |client| async move { - client - .tenant_secondary_download(tenant_shard_id, Some(Duration::from_secs(1))) - .await - }, - &self.config.jwt_token, - 3, - 10, - SHORT_RECONCILE_TIMEOUT, - &self.cancel, - ) - .await; + for secondary_node in secondaries { + match secondary_node + .with_client_retries( + |client| async move { + client + .tenant_secondary_download( + tenant_shard_id, + Some(Duration::from_secs(1)), + ) + .await + }, + &self.config.jwt_token, + 3, + 10, + SHORT_RECONCILE_TIMEOUT, + &self.cancel, + ) + .await + { + Some(Err(e)) => { + tracing::info!( + "Failed to download heatmap from {secondary_node} for {tenant_shard_id}: {e}" + ); + } + None => { + tracing::info!("Cancelled while downloading heatmap from {secondary_node} for {tenant_shard_id}"); + } + Some(Ok(progress)) => { + tracing::info!("Successfully downloaded heatmap from {secondary_node} for {tenant_shard_id}: {progress:?}"); + } + } + } } /// Look for shards which are oversized and in need of splitting @@ -7144,9 +7255,15 @@ impl Service { fn fill_node_plan(&self, node_id: NodeId) -> Vec { let mut locked = self.inner.write().unwrap(); let fill_requirement = locked.scheduler.compute_fill_requirement(node_id); + let (nodes, tenants, _scheduler) = locked.parts_mut(); - let mut tids_by_node = locked - .tenants + let node_az = nodes + .get(&node_id) + .expect("Node must exist") + .get_availability_zone_id() + .clone(); + + let mut tids_by_node = tenants .iter_mut() .filter_map(|(tid, tenant_shard)| { if !matches!( @@ -7159,6 +7276,25 @@ impl Service { return None; } + // AZ check: when filling nodes after a restart, our intent is to move _back_ the + // shards which belong on this node, not to promote shards whose scheduling preference + // would be on their currently attached node. So will avoid promoting shards whose + // home AZ doesn't match the AZ of the node we're filling. + match tenant_shard.preferred_az() { + None => { + // Shard doesn't have an AZ preference: it is elegible to be moved. + } + Some(az) if az == &node_az => { + // This shard's home AZ is equal to the node we're filling: it is + // elegible to be moved: fall through; + } + Some(_) => { + // This shard's home AZ is somewhere other than the node we're filling: + // do not include it in the fill plan. + return None; + } + } + if tenant_shard.intent.get_secondary().contains(&node_id) { if let Some(primary) = tenant_shard.intent.get_attached() { return Some((*primary, *tid)); diff --git a/storage_controller/src/service/context_iterator.rs b/storage_controller/src/service/context_iterator.rs index d38010a27e..dd6913e988 100644 --- a/storage_controller/src/service/context_iterator.rs +++ b/storage_controller/src/service/context_iterator.rs @@ -43,9 +43,6 @@ impl<'a> Iterator for TenantShardContextIterator<'a> { // Accumulate the schedule context for all the shards in a tenant schedule_context.avoid(&shard.intent.all_pageservers()); - if let Some(attached) = shard.intent.get_attached() { - schedule_context.push_attached(*attached); - } tenant_shards.push(shard); if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 { @@ -115,7 +112,7 @@ mod tests { assert_eq!(tenant_id, t1_id); assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); assert_eq!(shards.len(), 1); - assert_eq!(context.attach_count(), 1); + assert_eq!(context.location_count(), 2); let (tenant_id, context, shards) = iter.next().unwrap(); assert_eq!(tenant_id, t2_id); @@ -124,13 +121,13 @@ mod tests { assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2)); assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3)); assert_eq!(shards.len(), 4); - assert_eq!(context.attach_count(), 4); + assert_eq!(context.location_count(), 8); let (tenant_id, context, shards) = iter.next().unwrap(); assert_eq!(tenant_id, t3_id); assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); assert_eq!(shards.len(), 1); - assert_eq!(context.attach_count(), 1); + assert_eq!(context.location_count(), 2); for shard in tenants.values_mut() { shard.intent.clear(&mut scheduler); diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index e0a71b5822..2ba2a57eba 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -11,16 +11,14 @@ use crate::{ persistence::TenantShardPersistence, reconciler::{ReconcileUnits, ReconcilerConfig}, scheduler::{ - AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext, - SecondaryShardTag, + AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore, + RefCountUpdate, ScheduleContext, SecondaryShardTag, ShardTag, }, service::ReconcileResultRequest, }; use futures::future::{self, Either}; use itertools::Itertools; -use pageserver_api::controller_api::{ - AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, -}; +use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy}; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, shard::{ShardIdentity, TenantShardId}, @@ -33,6 +31,7 @@ use utils::{ generation::Generation, id::NodeId, seqwait::{SeqWait, SeqWaitError}, + shard::ShardCount, sync::gate::GateGuard, }; @@ -147,45 +146,67 @@ pub(crate) struct TenantShard { // Support/debug tool: if something is going wrong or flapping with scheduling, this may // be set to a non-active state to avoid making changes while the issue is fixed. scheduling_policy: ShardSchedulingPolicy, +} + +#[derive(Clone, Debug, Serialize)] +pub(crate) struct IntentState { + attached: Option, + secondary: Vec, // We should attempt to schedule this shard in the provided AZ to // decrease chances of cross-AZ compute. preferred_az_id: Option, } -#[derive(Default, Clone, Debug, Serialize)] -pub(crate) struct IntentState { - attached: Option, - secondary: Vec, -} - impl IntentState { - pub(crate) fn new() -> Self { + pub(crate) fn new(preferred_az_id: Option) -> Self { Self { attached: None, secondary: vec![], + preferred_az_id, } } - pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option) -> Self { + pub(crate) fn single( + scheduler: &mut Scheduler, + node_id: Option, + preferred_az_id: Option, + ) -> Self { if let Some(node_id) = node_id { - scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach); + scheduler.update_node_ref_counts( + node_id, + preferred_az_id.as_ref(), + RefCountUpdate::Attach, + ); } Self { attached: node_id, secondary: vec![], + preferred_az_id, } } pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option) { if self.attached != new_attached { if let Some(old_attached) = self.attached.take() { - scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); + scheduler.update_node_ref_counts( + old_attached, + self.preferred_az_id.as_ref(), + RefCountUpdate::Detach, + ); } if let Some(new_attached) = &new_attached { - scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach); + scheduler.update_node_ref_counts( + *new_attached, + self.preferred_az_id.as_ref(), + RefCountUpdate::Attach, + ); } self.attached = new_attached; } + + if let Some(new_attached) = &new_attached { + assert!(!self.secondary.contains(new_attached)); + } } /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from @@ -204,15 +225,28 @@ impl IntentState { let demoted = self.attached; self.attached = Some(promote_secondary); - scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary); + scheduler.update_node_ref_counts( + promote_secondary, + self.preferred_az_id.as_ref(), + RefCountUpdate::PromoteSecondary, + ); if let Some(demoted) = demoted { - scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached); + scheduler.update_node_ref_counts( + demoted, + self.preferred_az_id.as_ref(), + RefCountUpdate::DemoteAttached, + ); } } pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { - debug_assert!(!self.secondary.contains(&new_secondary)); - scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary); + assert!(!self.secondary.contains(&new_secondary)); + assert!(self.attached != Some(new_secondary)); + scheduler.update_node_ref_counts( + new_secondary, + self.preferred_az_id.as_ref(), + RefCountUpdate::AddSecondary, + ); self.secondary.push(new_secondary); } @@ -220,27 +254,43 @@ impl IntentState { pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) { let index = self.secondary.iter().position(|n| *n == node_id); if let Some(index) = index { - scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); + scheduler.update_node_ref_counts( + node_id, + self.preferred_az_id.as_ref(), + RefCountUpdate::RemoveSecondary, + ); self.secondary.remove(index); } } pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) { for secondary in self.secondary.drain(..) { - scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary); + scheduler.update_node_ref_counts( + secondary, + self.preferred_az_id.as_ref(), + RefCountUpdate::RemoveSecondary, + ); } } /// Remove the last secondary node from the list of secondaries pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) { if let Some(node_id) = self.secondary.pop() { - scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); + scheduler.update_node_ref_counts( + node_id, + self.preferred_az_id.as_ref(), + RefCountUpdate::RemoveSecondary, + ); } } pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { if let Some(old_attached) = self.attached.take() { - scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); + scheduler.update_node_ref_counts( + old_attached, + self.preferred_az_id.as_ref(), + RefCountUpdate::Detach, + ); } self.clear_secondary(scheduler); @@ -275,7 +325,11 @@ impl IntentState { if self.attached == Some(node_id) { self.attached = None; self.secondary.push(node_id); - scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached); + scheduler.update_node_ref_counts( + node_id, + self.preferred_az_id.as_ref(), + RefCountUpdate::DemoteAttached, + ); true } else { false @@ -315,6 +369,7 @@ pub(crate) struct ObservedStateLocation { /// we know that we might have some state on this node. pub(crate) conf: Option, } + pub(crate) struct ReconcilerWaiter { // For observability purposes, remember the ID of the shard we're // waiting for. @@ -360,6 +415,10 @@ pub(crate) enum ScheduleOptimizationAction { ReplaceSecondary(ReplaceSecondary), // Migrate attachment to an existing secondary location MigrateAttachment(MigrateAttachment), + // Create a secondary location, with the intent of later migrating to it + CreateSecondary(NodeId), + // Remove a secondary location that we previously created to facilitate a migration + RemoveSecondary(NodeId), } #[derive(Eq, PartialEq, Debug, Clone)] @@ -486,7 +545,7 @@ impl TenantShard { Self { tenant_shard_id, policy, - intent: IntentState::default(), + intent: IntentState::new(preferred_az_id), generation: Some(Generation::new(0)), shard, observed: ObservedState::default(), @@ -500,7 +559,6 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), - preferred_az_id, } } @@ -563,7 +621,7 @@ impl TenantShard { return Ok((false, node_id)); } - if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) { + if let Some(promote_secondary) = self.preferred_secondary(scheduler) { // Promote a secondary tracing::debug!("Promoted secondary {} to attached", promote_secondary); self.intent.promote_attached(scheduler, promote_secondary); @@ -572,7 +630,7 @@ impl TenantShard { // Pick a fresh node: either we had no secondaries or none were schedulable let node_id = scheduler.schedule_shard::( &self.intent.secondary, - &self.preferred_az_id, + &self.intent.preferred_az_id, context, )?; tracing::debug!("Selected {} as attached", node_id); @@ -594,9 +652,6 @@ impl TenantShard { let r = self.do_schedule(scheduler, context); context.avoid(&self.intent.all_pageservers()); - if let Some(attached) = self.intent.get_attached() { - context.push_attached(*attached); - } r } @@ -631,24 +686,7 @@ impl TenantShard { use PlacementPolicy::*; match self.policy { Attached(secondary_count) => { - let retain_secondaries = if self.intent.attached.is_none() - && scheduler.node_preferred(&self.intent.secondary).is_some() - { - // If we have no attached, and one of the secondaries is elegible to be promoted, retain - // one more secondary than we usually would, as one of them will become attached futher down this function. - secondary_count + 1 - } else { - secondary_count - }; - - while self.intent.secondary.len() > retain_secondaries { - // We have no particular preference for one secondary location over another: just - // arbitrarily drop from the end - self.intent.pop_secondary(scheduler); - modified = true; - } - - // Should have exactly one attached, and N secondaries + // Should have exactly one attached, and at least N secondaries let (modified_attached, attached_node_id) = self.schedule_attached(scheduler, context)?; modified |= modified_attached; @@ -657,7 +695,7 @@ impl TenantShard { while self.intent.secondary.len() < secondary_count { let node_id = scheduler.schedule_shard::( &used_pageservers, - &self.preferred_az_id, + &self.intent.preferred_az_id, context, )?; self.intent.push_secondary(scheduler, node_id); @@ -674,7 +712,7 @@ impl TenantShard { // Populate secondary by scheduling a fresh node let node_id = scheduler.schedule_shard::( &[], - &self.preferred_az_id, + &self.intent.preferred_az_id, context, )?; self.intent.push_secondary(scheduler, node_id); @@ -718,7 +756,7 @@ impl TenantShard { ) -> Result<(), ScheduleError> { let promote_to = match promote_to { Some(node) => node, - None => match scheduler.node_preferred(self.intent.get_secondary()) { + None => match self.preferred_secondary(scheduler) { Some(node) => node, None => { return Err(ScheduleError::ImpossibleConstraint); @@ -745,90 +783,276 @@ impl TenantShard { Ok(()) } + /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion + fn is_better_location( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + current: NodeId, + candidate: NodeId, + ) -> Option { + let Some(candidate_score) = scheduler.compute_node_score::( + candidate, + &self.intent.preferred_az_id, + schedule_context, + ) else { + // The candidate node is unavailable for scheduling or otherwise couldn't get a score + return None; + }; + + match scheduler.compute_node_score::( + current, + &self.intent.preferred_az_id, + schedule_context, + ) { + Some(current_score) => { + // Ignore utilization components when comparing scores: we don't want to migrate + // because of transient load variations, it risks making the system thrash, and + // migrating for utilization requires a separate high level view of the system to + // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily + // moving things around in the order that we hit this function. + let candidate_score = candidate_score.for_optimization(); + let current_score = current_score.for_optimization(); + + if candidate_score < current_score { + tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"); + Some(true) + } else { + // The candidate node is no better than our current location, so don't migrate + tracing::debug!( + "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})", + ); + Some(false) + } + } + None => { + // The current node is unavailable for scheduling, so we can't make any sensible + // decisions about optimisation. This should be a transient state -- if the node + // is offline then it will get evacuated, if is blocked by a scheduling mode + // then we will respect that mode by doing nothing. + tracing::debug!("Current node {current} is unavailable for scheduling"); + None + } + } + } + + fn find_better_location( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + current: NodeId, + hard_exclude: &[NodeId], + ) -> Option { + // Look for a lower-scoring location to attach to + let Ok(candidate_node) = scheduler.schedule_shard::( + hard_exclude, + &self.intent.preferred_az_id, + schedule_context, + ) else { + // A scheduling error means we have no possible candidate replacements + tracing::debug!("No candidate node found"); + return None; + }; + + if candidate_node == current { + // We're already at the best possible location, so don't migrate + tracing::debug!("Candidate node {candidate_node} is already in use"); + return None; + } + + self.is_better_location::(scheduler, schedule_context, current, candidate_node) + .and_then(|better| if better { Some(candidate_node) } else { None }) + } + + /// This function is an optimization, used to avoid doing large numbers of scheduling operations + /// when looking for optimizations. This function uses knowledge of how scores work to do some + /// fast checks for whether it may to be possible to improve a score. + /// + /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we + /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no + /// work. + pub(crate) fn maybe_optimizable( + &self, + scheduler: &mut Scheduler, + schedule_context: &ScheduleContext, + ) -> bool { + // Sharded tenant: check if any locations have a nonzero affinity score + if self.shard.count >= ShardCount(1) { + let schedule_context = schedule_context.project_detach(self); + for node in self.intent.all_pageservers() { + if let Some(af) = schedule_context.nodes.get(&node) { + if *af > AffinityScore(0) { + return true; + } + } + } + } + + // Attached tenant: check if the attachment is outside the preferred AZ + if let PlacementPolicy::Attached(_) = self.policy { + if let Some(attached) = self.intent.get_attached() { + if scheduler.get_node_az(attached) != self.intent.preferred_az_id { + return true; + } + } + } + + // Tenant with secondary locations: check if any are within the preferred AZ + for secondary in self.intent.get_secondary() { + if scheduler.get_node_az(secondary) == self.intent.preferred_az_id { + return true; + } + } + + // Does the tenant have excess secondaries? + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + return true; + } + + // Fall through: no optimizations possible + false + } + /// Optimize attachments: if a shard has a secondary location that is preferable to /// its primary location based on soft constraints, switch that secondary location /// to be attached. #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn optimize_attachment( &self, - nodes: &HashMap, + scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { let attached = (*self.intent.get_attached())?; - if self.intent.secondary.is_empty() { - // We can only do useful work if we have both attached and secondary locations: this - // function doesn't schedule new locations, only swaps between attached and secondaries. - return None; - } - let current_affinity_score = schedule_context.get_node_affinity(attached); - let current_attachment_count = schedule_context.get_node_attachments(attached); + let schedule_context = schedule_context.project_detach(self); - // Generate score for each node, dropping any un-schedulable nodes. - let all_pageservers = self.intent.all_pageservers(); - let mut scores = all_pageservers - .iter() - .flat_map(|node_id| { - let node = nodes.get(node_id); - if node.is_none() { - None - } else if matches!( - node.unwrap().get_scheduling(), - NodeSchedulingPolicy::Filling - ) { - // If the node is currently filling, don't count it as a candidate to avoid, - // racing with the background fill. - None - } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) { - None - } else { - let affinity_score = schedule_context.get_node_affinity(*node_id); - let attachment_count = schedule_context.get_node_attachments(*node_id); - Some((*node_id, affinity_score, attachment_count)) - } - }) - .collect::>(); - - // Sort precedence: - // 1st - prefer nodes with the lowest total affinity score - // 2nd - prefer nodes with the lowest number of attachments in this context - // 3rd - if all else is equal, sort by node ID for determinism in tests. - scores.sort_by_key(|i| (i.1, i.2, i.0)); - - if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) = - scores.first() - { - if attached != *preferred_node { - // The best alternative must be more than 1 better than us, otherwise we could end - // up flapping back next time we're called (e.g. there's no point migrating from - // a location with score 1 to a score zero, because on next location the situation - // would be the same, but in reverse). - if current_affinity_score > *preferred_affinity_score + AffinityScore(1) - || current_attachment_count > *preferred_attachment_count + 1 - { - tracing::info!( - "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})", - self.intent.get_secondary() - ); - return Some(ScheduleOptimization { - sequence: self.sequence, - action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: attached, - new_attached_node_id: *preferred_node, - }), - }); - } - } else { - tracing::debug!( - "Node {} is already preferred (score {:?})", - preferred_node, - preferred_affinity_score - ); + // If we already have a secondary that is higher-scoring than out current location, + // then simply migrate to it. + for secondary in self.intent.get_secondary() { + if let Some(true) = self.is_better_location::( + scheduler, + &schedule_context, + attached, + *secondary, + ) { + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: attached, + new_attached_node_id: *secondary, + }), + }); } } - // Fall-through: we didn't find an optimization - None + // Given that none of our current secondaries is a better location than our current + // attached location (checked above), we may trim any secondaries that are not needed + // for the placement policy. + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + // This code path cleans up extra secondaries after migrating, and/or + // trims extra secondaries after a PlacementPolicy::Attached(N) was + // modified to decrease N. + + let secondary_scores = self + .intent + .get_secondary() + .iter() + .map(|node_id| { + ( + *node_id, + scheduler.compute_node_score::( + *node_id, + &self.intent.preferred_az_id, + &schedule_context, + ), + ) + }) + .collect::>(); + + if secondary_scores.iter().any(|score| score.1.is_none()) { + // Don't have full list of scores, so can't make a good decision about which to drop unless + // there is an obvious one in the wrong AZ + for secondary in self.intent.get_secondary() { + if scheduler.get_node_az(secondary) == self.intent.preferred_az_id { + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(*secondary), + }); + } + } + + // Fall through: we didn't identify one to remove. This ought to be rare. + tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)", + self.intent.get_secondary() + ); + } else { + let victim = secondary_scores + .iter() + .max_by_key(|score| score.1.unwrap()) + .unwrap() + .0; + return Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(victim), + }); + } + } + + let replacement = self.find_better_location::( + scheduler, + &schedule_context, + attached, + &[], // Don't exclude secondaries: our preferred attachment location may be a secondary + ); + + // We have found a candidate and confirmed that its score is preferable + // to our current location. See if we have a secondary location in the preferred location already: if not, + // then create one. + if let Some(replacement) = replacement { + // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still + // not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing + // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred + // AZ: skip this optimization if it is not in our final, preferred AZ. + // + // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes + // there are too overloaded for scheduler to suggest them, more should be provisioned eventually). + if self.intent.preferred_az_id.is_some() + && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id + { + tracing::debug!( + "Candidate node {replacement} is not in preferred AZ {:?}", + self.intent.preferred_az_id + ); + + // This should only happen if our current location is not in the preferred AZ, otherwise + // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because + // AZ is the highest priority part of NodeAttachmentSchedulingScore. + debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id); + + return None; + } + + if !self.intent.get_secondary().contains(&replacement) { + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::CreateSecondary(replacement), + }) + } else { + // We already have a secondary in the preferred location, let's try migrating to it. Our caller + // will check the warmth of the destination before deciding whether to really execute this. + Some(ScheduleOptimization { + sequence: self.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: attached, + new_attached_node_id: replacement, + }), + }) + } + } else { + // We didn't find somewhere we'd rather be, and we don't have any excess secondaries + // to clean up: no action required. + None + } } #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] @@ -837,50 +1061,40 @@ impl TenantShard { scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { - if self.intent.secondary.is_empty() { - // We can only do useful work if we have both attached and secondary locations: this - // function doesn't schedule new locations, only swaps between attached and secondaries. + if self.intent.get_secondary().len() > self.policy.want_secondaries() { + // We have extra secondaries, perhaps to facilitate a migration of the attached location: + // do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done, + // and we are called again, we will proceed. + tracing::debug!("Too many secondaries: skipping"); return None; } + let schedule_context = schedule_context.project_detach(self); + for secondary in self.intent.get_secondary() { - let Some(affinity_score) = schedule_context.nodes.get(secondary) else { - // We're already on a node unaffected any affinity constraints, - // so we won't change it. - continue; + // Make sure we don't try to migrate a secondary to our attached location: this case happens + // easily in environments without multiple AZs. + let exclude = match self.intent.attached { + Some(attached) => vec![attached], + None => vec![], }; - // Let the scheduler suggest a node, where it would put us if we were scheduling afresh - // This implicitly limits the choice to nodes that are available, and prefers nodes - // with lower utilization. - let Ok(candidate_node) = scheduler.schedule_shard::( - &self.intent.all_pageservers(), - &self.preferred_az_id, - schedule_context, - ) else { - // A scheduling error means we have no possible candidate replacements - continue; - }; - - let candidate_affinity_score = schedule_context - .nodes - .get(&candidate_node) - .unwrap_or(&AffinityScore::FREE); - - // The best alternative must be more than 1 better than us, otherwise we could end - // up flapping back next time we're called. - if *candidate_affinity_score + AffinityScore(1) < *affinity_score { - // If some other node is available and has a lower score than this node, then - // that other node is a good place to migrate to. - tracing::info!( - "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})", - self.intent.get_secondary() - ); + let replacement = self.find_better_location::( + scheduler, + &schedule_context, + *secondary, + &exclude, + ); + assert!(replacement != Some(*secondary)); + if let Some(replacement) = replacement { + // We have found a candidate and confirmed that its score is preferable + // to our current location. See if we have a secondary location in the preferred location already: if not, + // then create one. return Some(ScheduleOptimization { sequence: self.sequence, action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary { old_node_id: *secondary, - new_node_id: candidate_node, + new_node_id: replacement, }), }); } @@ -921,11 +1135,54 @@ impl TenantShard { self.intent.remove_secondary(scheduler, old_node_id); self.intent.push_secondary(scheduler, new_node_id); } + ScheduleOptimizationAction::CreateSecondary(new_node_id) => { + self.intent.push_secondary(scheduler, new_node_id); + } + ScheduleOptimizationAction::RemoveSecondary(old_secondary) => { + self.intent.remove_secondary(scheduler, old_secondary); + } } true } + /// When a shard has several secondary locations, we need to pick one in situations where + /// we promote one of them to an attached location: + /// - When draining a node for restart + /// - When responding to a node failure + /// + /// In this context, 'preferred' does not mean the node with the best scheduling score: instead + /// we want to pick the node which is best for use _temporarily_ while the previous attached location + /// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary + /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary + /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist + /// they're probably not warmed up yet). The latter behavior is based oni + /// + /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the + /// caller needs to a pick a node some other way. + pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option { + let candidates = scheduler.filter_usable_nodes(&self.intent.secondary); + + // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer + // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ), + // rather than a short-lived secondary location being used for optimization/migration (which would have + // been scheduled in our preferred AZ). + let mut candidates = candidates + .iter() + .map(|(node_id, node_az)| { + if node_az == &self.intent.preferred_az_id { + (1, *node_id) + } else { + (0, *node_id) + } + }) + .collect::>(); + + candidates.sort(); + + candidates.first().map(|i| i.1) + } + /// Query whether the tenant's observed state for attached node matches its intent state, and if so, /// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there. @@ -1207,7 +1464,7 @@ impl TenantShard { detach, reconciler_config, config: self.config.clone(), - preferred_az: self.preferred_az_id.clone(), + preferred_az: self.intent.preferred_az_id.clone(), observed: self.observed.clone(), original_observed: self.observed.clone(), compute_hook: compute_hook.clone(), @@ -1428,7 +1685,6 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), - preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone), }) } @@ -1444,16 +1700,16 @@ impl TenantShard { config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(), - preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()), + preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()), } } pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> { - self.preferred_az_id.as_ref() + self.intent.preferred_az_id.as_ref() } pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) { - self.preferred_az_id = Some(preferred_az_id); + self.intent.preferred_az_id = Some(preferred_az_id); } /// Returns all the nodes to which this tenant shard is attached according to the @@ -1756,65 +2012,90 @@ pub(crate) mod tests { } #[test] - fn optimize_attachment() -> anyhow::Result<()> { - let nodes = make_test_nodes(3, &[]); + /// Simple case: moving attachment to somewhere better where we already have a secondary + fn optimize_attachment_simple() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 3, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); // Initially: both nodes attached on shard 1, and both have secondary locations // on different nodes. - shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1))); - shard_a.intent.push_secondary(&mut scheduler, NodeId(2)); + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(1)); shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1))); - shard_b.intent.push_secondary(&mut scheduler, NodeId(3)); + shard_b.intent.push_secondary(&mut scheduler, NodeId(2)); - let mut schedule_context = ScheduleContext::default(); - schedule_context.avoid(&shard_a.intent.all_pageservers()); - schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); - schedule_context.avoid(&shard_b.intent.all_pageservers()); - schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); + fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context.avoid(&shard_b.intent.all_pageservers()); + schedule_context + } - let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context); - - // Either shard should recognize that it has the option to switch to a secondary location where there - // would be no other shards from the same tenant, and request to do so. + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context); assert_eq!( optimization_a, Some(ScheduleOptimization { sequence: shard_a.sequence, action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: NodeId(1), - new_attached_node_id: NodeId(2) + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) }) }) ); - - // Note that these optimizing two shards in the same tenant with the same ScheduleContext is - // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility - // of [`Service::optimize_all`] to avoid trying - // to do optimizations for multiple shards in the same tenant at the same time. Generating - // both optimizations is just done for test purposes - let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context); - assert_eq!( - optimization_b, - Some(ScheduleOptimization { - sequence: shard_b.sequence, - action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: NodeId(1), - new_attached_node_id: NodeId(3) - }) - }) - ); - - // Applying these optimizations should result in the end state proposed shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap()); - assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2))); - assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]); - shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap()); - assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3))); - assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]); + + // // Either shard should recognize that it has the option to switch to a secondary location where there + // // would be no other shards from the same tenant, and request to do so. + // assert_eq!( + // optimization_a_prepare, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::CreateSecondary(NodeId(2)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_migrate, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + // old_attached_node_id: NodeId(1), + // new_attached_node_id: NodeId(2) + // }) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_cleanup, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None); shard_a.intent.clear(&mut scheduler); shard_b.intent.clear(&mut scheduler); @@ -1822,6 +2103,190 @@ pub(crate) mod tests { Ok(()) } + #[test] + /// Complicated case: moving attachment to somewhere better where we do not have a secondary + /// already, creating one as needed. + fn optimize_attachment_multistep() -> anyhow::Result<()> { + let nodes = make_test_nodes( + 3, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); + let mut scheduler = Scheduler::new(nodes.values()); + + // Two shards of a tenant that wants to be in AZ A + let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); + + // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(3)); + shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3))); + shard_b.intent.push_secondary(&mut scheduler, NodeId(2)); + + fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + schedule_context.avoid(&shard_a.intent.all_pageservers()); + schedule_context.avoid(&shard_b.intent.all_pageservers()); + schedule_context + } + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(1)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) + }) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + let schedule_context = make_schedule_context(&shard_a, &shard_b); + let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shard_a.sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3)) + }) + ); + shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None); + + shard_a.intent.clear(&mut scheduler); + shard_b.intent.clear(&mut scheduler); + + Ok(()) + } + + #[test] + /// Check that multi-step migration works when moving to somewhere that is only better by + /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary + /// counting toward the affinity score such that it prevents the rest of the migration from happening. + fn optimize_attachment_marginal() -> anyhow::Result<()> { + let nodes = make_test_nodes(2, &[]); + let mut scheduler = Scheduler::new(nodes.values()); + + // Multi-sharded tenant, we will craft a situation where affinity + // scores differ only slightly + let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None); + + // 1 attached on node 1 + shards[0] + .intent + .set_attached(&mut scheduler, Some(NodeId(1))); + // 3 attached on node 2 + shards[1] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + shards[2] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + shards[3] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + + // The scheduler should figure out that we need to: + // - Create a secondary for shard 3 on node 1 + // - Migrate shard 3 to node 1 + // - Remove shard 3's location on node 2 + + fn make_schedule_context(shards: &Vec) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + for shard in shards { + schedule_context.avoid(&shard.intent.all_pageservers()); + } + schedule_context + } + + let schedule_context = make_schedule_context(&shards); + let optimization_a_prepare = + shards[1].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shards[1].sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(1)) + }) + ); + shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + let schedule_context = make_schedule_context(&shards); + let optimization_a_migrate = + shards[1].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shards[1].sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) + }) + }) + ); + shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + let schedule_context = make_schedule_context(&shards); + let optimization_a_cleanup = + shards[1].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_cleanup, + Some(ScheduleOptimization { + sequence: shards[1].sequence, + action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2)) + }) + ); + shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + // Everything should be stable now + let schedule_context = make_schedule_context(&shards); + assert_eq!( + shards[0].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + assert_eq!( + shards[1].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + assert_eq!( + shards[2].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + assert_eq!( + shards[3].optimize_attachment(&mut scheduler, &schedule_context), + None + ); + + for mut shard in shards { + shard.intent.clear(&mut scheduler); + } + + Ok(()) + } + #[test] fn optimize_secondary() -> anyhow::Result<()> { let nodes = make_test_nodes(4, &[]); @@ -1839,9 +2304,7 @@ pub(crate) mod tests { let mut schedule_context = ScheduleContext::default(); schedule_context.avoid(&shard_a.intent.all_pageservers()); - schedule_context.push_attached(shard_a.intent.get_attached().unwrap()); schedule_context.avoid(&shard_b.intent.all_pageservers()); - schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context); @@ -1872,7 +2335,6 @@ pub(crate) mod tests { // called repeatedly in the background. // Returns the applied optimizations fn optimize_til_idle( - nodes: &HashMap, scheduler: &mut Scheduler, shards: &mut [TenantShard], ) -> Vec { @@ -1884,14 +2346,18 @@ pub(crate) mod tests { for shard in shards.iter() { schedule_context.avoid(&shard.intent.all_pageservers()); - if let Some(attached) = shard.intent.get_attached() { - schedule_context.push_attached(*attached); - } } for shard in shards.iter_mut() { - let optimization = shard.optimize_attachment(nodes, &schedule_context); + let optimization = shard.optimize_attachment(scheduler, &schedule_context); + tracing::info!( + "optimize_attachment({})={:?}", + shard.tenant_shard_id, + optimization + ); if let Some(optimization) = optimization { + // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist + assert!(shard.maybe_optimizable(scheduler, &schedule_context)); optimizations.push(optimization.clone()); shard.apply_optimization(scheduler, optimization); any_changed = true; @@ -1899,7 +2365,15 @@ pub(crate) mod tests { } let optimization = shard.optimize_secondary(scheduler, &schedule_context); + tracing::info!( + "optimize_secondary({})={:?}", + shard.tenant_shard_id, + optimization + ); if let Some(optimization) = optimization { + // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist + assert!(shard.maybe_optimizable(scheduler, &schedule_context)); + optimizations.push(optimization.clone()); shard.apply_optimization(scheduler, optimization); any_changed = true; @@ -1923,14 +2397,34 @@ pub(crate) mod tests { /// that it converges. #[test] fn optimize_add_nodes() -> anyhow::Result<()> { - let nodes = make_test_nodes(4, &[]); + let nodes = make_test_nodes( + 9, + &[ + // Initial 6 nodes + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + AvailabilityZone("az-c".to_string()), + // Three we will add later + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); - // Only show the scheduler a couple of nodes + // Only show the scheduler two nodes in each AZ to start with let mut scheduler = Scheduler::new([].iter()); - scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); - scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); + for i in 1..=6 { + scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap()); + } - let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); + let mut shards = make_test_tenant( + PlacementPolicy::Attached(1), + ShardCount::new(4), + Some(AvailabilityZone("az-a".to_string())), + ); let mut schedule_context = ScheduleContext::default(); for shard in &mut shards { assert!(shard @@ -1938,30 +2432,50 @@ pub(crate) mod tests { .is_ok()); } - // We should see equal number of locations on the two nodes. - assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4); + // Initial: attached locations land in the tenant's home AZ. + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2); - - assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4); + assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2); - // Add another two nodes: we should see the shards spread out when their optimize - // methods are called - scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap()); - scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap()); - optimize_til_idle(&nodes, &mut scheduler, &mut shards); + // Initial: secondary locations in a remote AZ + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0); - assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2); + // Add another three nodes: we should see the shards spread out when their optimize + // methods are called + scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap()); + scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap()); + scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap()); + optimize_til_idle(&mut scheduler, &mut shards); + + // We expect one attached location was moved to the new node in the tenant's home AZ + assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1); + // The original node has one less attached shard + assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1); + // One of the original nodes still has two attachments, since there are an odd number of nodes assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2); - assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1); - - assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2); - assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1); + // None of our secondaries moved, since we already had enough nodes for those to be + // scheduled perfectly + assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0); + assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1); + assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0); for shard in shards.iter_mut() { shard.intent.clear(&mut scheduler); @@ -2001,10 +2515,10 @@ pub(crate) mod tests { shard.schedule(&mut scheduler, context).unwrap(); } - let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a); + let applied_to_a = optimize_til_idle(&mut scheduler, &mut a); assert_eq!(applied_to_a, vec![]); - let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b); + let applied_to_b = optimize_til_idle(&mut scheduler, &mut b); assert_eq!(applied_to_b, vec![]); for shard in a.iter_mut().chain(b.iter_mut()) { diff --git a/test_runner/performance/test_sharding_autosplit.py b/test_runner/performance/test_sharding_autosplit.py index caa89955e3..76c3ad01a4 100644 --- a/test_runner/performance/test_sharding_autosplit.py +++ b/test_runner/performance/test_sharding_autosplit.py @@ -2,6 +2,7 @@ from __future__ import annotations import concurrent.futures import re +import threading from pathlib import Path import pytest @@ -188,7 +189,20 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): check_pgbench_output(out_path) - with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads: + stop_pump = threading.Event() + + def pump_controller(): + # Run a background loop to force the storage controller to run its + # background work faster than it otherwise would: this helps + # us: + # A) to create a test that runs in a shorter time + # B) to create a test that is more intensive by doing the shard migrations + # after splits happen more rapidly. + while not stop_pump.is_set(): + env.storage_controller.reconcile_all() + stop_pump.wait(0.1) + + with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count + 1) as pgbench_threads: pgbench_futs = [] for tenant_state in tenants.values(): fut = pgbench_threads.submit(run_pgbench_init, tenant_state.endpoint) @@ -198,6 +212,8 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): for fut in pgbench_futs: fut.result() + pump_fut = pgbench_threads.submit(pump_controller) + pgbench_futs = [] for tenant_state in tenants.values(): fut = pgbench_threads.submit(run_pgbench_main, tenant_state.endpoint) @@ -207,6 +223,9 @@ def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): for fut in pgbench_futs: fut.result() + stop_pump.set() + pump_fut.result() + def assert_all_split(): for tenant_id in tenants.keys(): shards = tenant_get_shards(env, tenant_id) diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index 49f41483ec..d45db28c78 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -13,11 +13,13 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserver, PageserverAvailability, PageserverSchedulingPolicy, ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pg_version import PgVersion +from fixtures.utils import wait_until def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]: @@ -85,8 +87,12 @@ def test_storage_controller_many_tenants( ) AZS = ["alpha", "bravo", "charlie"] + + def az_selector(node_id): + return f"az-{AZS[(node_id - 1) % len(AZS)]}" + neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update( - {"availability_zone": f"az-{AZS[ps_cfg['id'] % len(AZS)]}"} + {"availability_zone": az_selector(ps_cfg["id"])} ) # A small sleep on each call into the notify hook, to simulate the latency of doing a database write @@ -168,6 +174,31 @@ def test_storage_controller_many_tenants( log.info(f"Resident memory: {rss} ({ rss / total_shards} per shard)") assert rss < expect_memory_per_shard * total_shards + def assert_all_tenants_scheduled_in_home_az(): + for tenant_id in tenant_ids: + desc = env.storage_controller.tenant_describe(tenant_id) + preferred_az = None + for shard in desc["shards"]: + # All shards in a tenant should have the same preferred AZ + if preferred_az is None: + preferred_az = shard["preferred_az_id"] + else: + assert preferred_az == shard["preferred_az_id"] + + # Attachment should be in the preferred AZ + assert shard["preferred_az_id"] == az_selector( + shard["node_attached"] + ), f"Shard {shard['tenant_shard_id']} not in {shard['preferred_az_id']}" + + # Secondary locations should not be in the preferred AZ + for node_secondary in shard["node_secondary"]: + assert ( + shard["preferred_az_id"] != az_selector(node_secondary) + ), f"Shard {shard['tenant_shard_id']} secondary should be in {shard['preferred_az_id']}" + + # There should only be one secondary location (i.e. no migrations in flight) + assert len(shard["node_secondary"]) == 1 + # Issue more concurrent operations than the storage controller's reconciler concurrency semaphore # permits, to ensure that we are exercising stressing that. api_concurrency = 135 @@ -242,6 +273,22 @@ def test_storage_controller_many_tenants( f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s" ) + # Check initial scheduling + assert_all_tenants_scheduled_in_home_az() + az_attached_counts: defaultdict[str, int] = defaultdict(int) + az_secondary_counts: defaultdict[str, int] = defaultdict(int) + node_attached_counts: defaultdict[str, int] = defaultdict(int) + for tenant_id in tenants.keys(): + desc = env.storage_controller.tenant_describe(tenant_id) + for shard in desc["shards"]: + az_attached_counts[az_selector(shard["node_attached"])] += 1 + node_attached_counts[shard["node_attached"]] += 1 + for node_secondary in shard["node_secondary"]: + az_secondary_counts[az_selector(node_secondary)] += 1 + + log.info(f"Initial node attached counts: {node_attached_counts}") + log.info(f"Initial AZ shard counts: {az_attached_counts}, {az_secondary_counts}") + # Plan operations: ensure each tenant with a timeline gets at least # one of each operation type. Then add other tenants to make up the # numbers. @@ -450,11 +497,77 @@ def test_storage_controller_many_tenants( env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) env.storage_controller.consistency_check() + # Since we did `reconcile_until_idle` during the above loop, the system should be left in + # an optimally scheduled state. Validate that this includes all the tenants being scheduled + # in their home AZ. + assert_all_tenants_scheduled_in_home_az() + # Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn, # as they were not offline long enough to trigger any scheduling changes. env.storage_controller.consistency_check() check_memory() + # Simulate loss of an AZ + victim_az = "az-alpha" + killed_pageservers = [] + for ps in env.pageservers: + if az_selector(ps.id) == victim_az: + ps.stop(immediate=True) + killed_pageservers.append(ps) + log.info(f"Killed pageserver {ps.id}") + + assert killed_pageservers + + # Wait for the controller to notice the pageservers are dead + def assert_pageservers_availability( + pageservers: list[NeonPageserver], expected_availability: PageserverAvailability + ): + nodes = env.storage_controller.nodes() + checked_any = False + node_ids = [ps.id for ps in pageservers] + for node in nodes: + if node["id"] in node_ids: + checked_any = True + assert ( + node["availability"] == expected_availability + ), f"Node {node['id']} is not {expected_availability} yet: {node['availability']}" + + assert checked_any + + wait_until( + lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.OFFLINE), + timeout=60, + ) + + # Let the controller finish all its rescheduling + env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) + + # Check that all the tenants are rescheduled to the remaining pageservers + for tenant_id in tenant_ids: + desc = env.storage_controller.tenant_describe(tenant_id) + for shard in desc["shards"]: + # Attachment should be outside the AZ where we killed the pageservers + assert ( + az_selector(shard["node_attached"]) != victim_az + ), f"Shard {shard['tenant_shard_id']} still in {victim_az} (node {shard['node_attached']})" + + # Bring back the pageservers + for ps in killed_pageservers: + ps.start() + + wait_until( + lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.ACTIVE), + timeout=60, + ) + + # A very long timeout is required: we will be migrating all the tenants on all the pageservers + # in the region that we just restored. Assume it'll take up to twice as long as it took to fill + # a single node + env.storage_controller.reconcile_until_idle( + max_interval=0.1, timeout_secs=DRAIN_FILL_TIMEOUT * 4 + ) + assert_all_tenants_scheduled_in_home_az() + # Stop the storage controller before tearing down fixtures, because it otherwise might log # errors trying to call our `ComputeReconfigure`. env.storage_controller.stop() diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 673904a1cd..86a6b7428b 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -520,14 +520,18 @@ def test_sharding_split_smoke( shard_count = 2 # Shard count we split into split_shard_count = 4 - # We will have 2 shards per pageserver once done (including secondaries) - neon_env_builder.num_pageservers = split_shard_count + # In preferred AZ & other AZ we will end up with one shard per pageserver + neon_env_builder.num_pageservers = split_shard_count * 2 # Two AZs def assign_az(ps_cfg): az = f"az-{(ps_cfg['id'] - 1) % 2}" ps_cfg["availability_zone"] = az + # We will run more pageservers than tests usually do, so give them tiny page caches + # in case we're on a test node under memory pressure. + ps_cfg["page_cache_size"] = 128 + neon_env_builder.pageserver_config_override = assign_az # 1MiB stripes: enable getting some meaningful data distribution without @@ -679,8 +683,8 @@ def test_sharding_split_smoke( # - shard_count reconciles for the original setup of the tenant # - shard_count reconciles for detaching the original secondary locations during split # - split_shard_count reconciles during shard splitting, for setting up secondaries. - # - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move) - expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2 + # - split_shard_count/2 reconciles to migrate shards to their temporary secondaries + expect_reconciles = shard_count * 2 + split_shard_count + 3 * (split_shard_count / 2) reconcile_ok = env.storage_controller.get_metric_value( "storage_controller_reconcile_complete_total", filter={"status": "ok"} @@ -745,10 +749,14 @@ def test_sharding_split_smoke( # dominated by shard count. log.info(f"total: {total}") assert total == { - 1: 2, - 2: 2, - 3: 2, - 4: 2, + 1: 1, + 2: 1, + 3: 1, + 4: 1, + 5: 1, + 6: 1, + 7: 1, + 8: 1, } # The controller is not required to lay out the attached locations in any particular way, but @@ -1387,13 +1395,7 @@ def test_sharding_split_failures( else: attached_count += 1 - if exclude_ps_id is not None: - # For a node failure case, we expect there to be a secondary location - # scheduled on the offline node, so expect one fewer secondary in total - assert secondary_count == initial_shard_count - 1 - else: - assert secondary_count == initial_shard_count - + assert secondary_count == initial_shard_count assert attached_count == initial_shard_count def assert_split_done(exclude_ps_id: int | None = None) -> None: diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 3a55e75589..8ffb6ba6b2 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3213,11 +3213,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool: @run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder): def assign_az(ps_cfg): - az = f"az-{ps_cfg['id']}" + az = f"az-{ps_cfg['id'] % 2}" + log.info("Assigned AZ {az}") ps_cfg["availability_zone"] = az neon_env_builder.pageserver_config_override = assign_az - neon_env_builder.num_pageservers = 2 + neon_env_builder.num_pageservers = 4 env = neon_env_builder.init_configs() env.start() @@ -3232,8 +3233,14 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder): assert shards[0]["preferred_az_id"] == expected_az + # When all other schedule scoring parameters are equal, tenants should round-robin on AZs + assert env.storage_controller.tenant_describe(tids[0])["shards"][0]["preferred_az_id"] == "az-0" + assert env.storage_controller.tenant_describe(tids[1])["shards"][0]["preferred_az_id"] == "az-1" + assert env.storage_controller.tenant_describe(tids[2])["shards"][0]["preferred_az_id"] == "az-0" + + # Try modifying preferred AZ updated = env.storage_controller.set_preferred_azs( - {TenantShardId(tid, 0, 0): "foo" for tid in tids} + {TenantShardId(tid, 0, 0): "az-0" for tid in tids} ) assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids]) @@ -3241,29 +3248,24 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder): for tid in tids: shards = env.storage_controller.tenant_describe(tid)["shards"] assert len(shards) == 1 - assert shards[0]["preferred_az_id"] == "foo" + assert shards[0]["preferred_az_id"] == "az-0" - # Generate a layer to avoid shard split handling on ps from tripping - # up on debug assert. - timeline_id = TimelineId.generate() - env.create_timeline("bar", tids[0], timeline_id) - - workload = Workload(env, tids[0], timeline_id, branch_name="bar") - workload.init() - workload.write_rows(256) - workload.validate() + # Having modified preferred AZ, we should get moved there + env.storage_controller.reconcile_until_idle(max_interval=0.1) + for tid in tids: + shard = env.storage_controller.tenant_describe(tid)["shards"][0] + attached_to = shard["node_attached"] + attached_in_az = env.get_pageserver(attached_to).az_id + assert shard["preferred_az_id"] == attached_in_az == "az-0" env.storage_controller.tenant_shard_split(tids[0], shard_count=2) + env.storage_controller.reconcile_until_idle(max_interval=0.1) shards = env.storage_controller.tenant_describe(tids[0])["shards"] assert len(shards) == 2 for shard in shards: attached_to = shard["node_attached"] - expected_az = env.get_pageserver(attached_to).az_id - - # The scheduling optimization logic is not yet AZ-aware, so doesn't succeed - # in putting the tenant shards in the preferred AZ. - # To be fixed in https://github.com/neondatabase/neon/pull/9916 - # assert shard["preferred_az_id"] == expected_az + attached_in_az = env.get_pageserver(attached_to).az_id + assert shard["preferred_az_id"] == attached_in_az == "az-0" @run_only_on_default_postgres("Postgres version makes no difference here")