From 0d6966b9cb6f0d60ae4e1a7e2253f9e4a0f2e427 Mon Sep 17 00:00:00 2001 From: John Spray Date: Sun, 1 Dec 2024 20:45:03 +0000 Subject: [PATCH] Take 2: Optimization, many_tenants now passes --- storage_controller/src/scheduler.rs | 132 +++++++++++------------ storage_controller/src/service.rs | 73 +++++-------- storage_controller/src/tenant_shard.rs | 140 +++++++++++++++++++++---- 3 files changed, 208 insertions(+), 137 deletions(-) diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 94514d6436..0e57c52029 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -51,7 +51,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { /// Return a score that drops any components based on node utilization: this is useful /// for finding scores for scheduling optimisation, when we want to avoid rescheduling /// shards due to e.g. disk usage, to avoid flapping. - fn disregard_utilization(&self) -> Self; + fn for_optimization(&self) -> Self; fn is_overloaded(&self) -> bool; fn node_id(&self) -> NodeId; @@ -149,10 +149,6 @@ pub(crate) struct NodeAttachmentSchedulingScore { /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, - /// Size of [`ScheduleContext::attached_nodes`] for the current node. - /// This normally tracks the number of attached shards belonging to the - /// tenant being scheduled that are already on this node. - attached_shards_in_context: usize, /// Utilisation score that combines shard count and disk utilisation utilization_score: u64, /// Total number of shards attached to this node. When nodes have identical utilisation, this @@ -162,28 +158,6 @@ pub(crate) struct NodeAttachmentSchedulingScore { node_id: NodeId, } -impl NodeAttachmentSchedulingScore { - /// For speculative scheduling: generate the score for this node as if one more tenant - /// shard was attached to it. - pub(crate) fn project_attachment(&self) -> Self { - Self { - total_attached_shard_count: self.total_attached_shard_count + 1, - ..*self - } - } - - /// The literal equality and comparison operators include the node ID to provide a deterministic - /// ordering. However, when doing scheduling optimisation we of course don't want to regard - /// a difference in node ID as significant, so that code uses this method to exclude that case. - pub(crate) fn different(&self, other: &Self) -> bool { - self.az_match != other.az_match - || self.affinity_score != other.affinity_score - || self.attached_shards_in_context != other.attached_shards_in_context - || self.utilization_score != other.utilization_score - || self.total_attached_shard_count != other.total_attached_shard_count - } -} - impl NodeSchedulingScore for NodeAttachmentSchedulingScore { fn generate( node_id: &NodeId, @@ -205,7 +179,6 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { .copied() .unwrap_or(AffinityScore::FREE), az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), - attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0), utilization_score: utilization.cached_score(), total_attached_shard_count: node.attached_shard_count, node_id: *node_id, @@ -216,10 +189,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { /// of the score that can only be resolved by moving things (such as inter-shard affinity /// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which /// can fluctuate for other reasons) - fn disregard_utilization(&self) -> Self { + fn for_optimization(&self) -> Self { Self { utilization_score: 0, total_attached_shard_count: 0, + node_id: NodeId(0), ..*self } } @@ -255,17 +229,6 @@ pub(crate) struct NodeSecondarySchedulingScore { node_id: NodeId, } -impl NodeSecondarySchedulingScore { - /// The literal equality and comparison operators include the node ID to provide a deterministic - /// ordering. However, when doing scheduling optimisation we of course don't want to regard - /// a difference in node ID as significant, so that code uses this method to exclude that case. - pub(crate) fn different(&self, other: &Self) -> bool { - self.az_match != other.az_match - || self.affinity_score != other.affinity_score - || self.utilization_score != other.utilization_score - } -} - impl NodeSchedulingScore for NodeSecondarySchedulingScore { fn generate( node_id: &NodeId, @@ -293,9 +256,11 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { }) } - fn disregard_utilization(&self) -> Self { + fn for_optimization(&self) -> Self { Self { utilization_score: 0, + total_attached_shard_count: 0, + node_id: NodeId(0), ..*self } } @@ -417,21 +382,29 @@ impl ScheduleContext { /// Imagine we migrated our attached location to the given node. Return a new context that /// reflects this. - pub(crate) fn project_detach(&self, source: NodeId) -> Self { + pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self { let mut new_context = self.clone(); - if let Some(count) = new_context.attached_nodes.get_mut(&source) { - // It's unexpected that we get called in a context where the source of - // the migration is not already in the context. - debug_assert!(*count > 0); + if let Some(attached) = shard.intent.get_attached() { + if let Some(count) = new_context.attached_nodes.get_mut(attached) { + // It's unexpected that we get called in a context where the source of + // the migration is not already in the context. + debug_assert!(*count > 0); - if *count > 0 { - *count -= 1; + if *count > 0 { + *count -= 1; + } + } + + if let Some(score) = new_context.nodes.get_mut(attached) { + score.dec(); } } - if let Some(score) = new_context.nodes.get_mut(&source) { - score.dec(); + for secondary in shard.intent.get_secondary() { + if let Some(score) = new_context.nodes.get_mut(secondary) { + score.dec(); + } } new_context @@ -826,7 +799,7 @@ impl Scheduler { tracing::info!( "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})", scores.iter().map(|i| i.node_id().0).collect::>() - ); + ); } // Note that we do not update shard count here to reflect the scheduling: that @@ -1181,20 +1154,25 @@ mod tests { use test_log::test; #[test] - /// Reproducer for https://github.com/neondatabase/neon/issues/8969 -- a case where - /// having an odd number of nodes can cause instability when scheduling even numbers of - /// shards with secondaries + /// Make sure that when we have an odd number of nodes and an even number of shards, we still + /// get scheduling stability. fn odd_nodes_stability() { - let az_tag = AvailabilityZone("az-a".to_string()); + let az_a = AvailabilityZone("az-a".to_string()); + let az_b = AvailabilityZone("az-b".to_string()); let nodes = test_utils::make_test_nodes( - 5, + 10, &[ - az_tag.clone(), - az_tag.clone(), - az_tag.clone(), - az_tag.clone(), - az_tag.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_a.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), + az_b.clone(), ], ); let mut scheduler = Scheduler::new(nodes.values()); @@ -1210,6 +1188,7 @@ mod tests { expect_secondary: NodeId, scheduled_shards: &mut Vec, scheduler: &mut Scheduler, + preferred_az: Option, context: &mut ScheduleContext, ) { let shard_identity = ShardIdentity::new( @@ -1222,6 +1201,7 @@ mod tests { tenant_shard_id, shard_identity, pageserver_api::controller_api::PlacementPolicy::Attached(1), + preferred_az, ); shard.schedule(scheduler, context).unwrap(); @@ -1244,9 +1224,10 @@ mod tests { shard_count: ShardCount(8), }, NodeId(1), - NodeId(2), + NodeId(6), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1256,10 +1237,11 @@ mod tests { shard_number: ShardNumber(1), shard_count: ShardCount(8), }, - NodeId(3), - NodeId(4), + NodeId(2), + NodeId(7), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1269,10 +1251,11 @@ mod tests { shard_number: ShardNumber(2), shard_count: ShardCount(8), }, - NodeId(5), - NodeId(2), + NodeId(3), + NodeId(8), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1283,9 +1266,10 @@ mod tests { shard_count: ShardCount(8), }, NodeId(4), - NodeId(1), + NodeId(9), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1295,10 +1279,11 @@ mod tests { shard_number: ShardNumber(4), shard_count: ShardCount(8), }, - NodeId(3), NodeId(5), + NodeId(10), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1308,10 +1293,11 @@ mod tests { shard_number: ShardNumber(5), shard_count: ShardCount(8), }, - NodeId(2), NodeId(1), + NodeId(6), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1321,10 +1307,11 @@ mod tests { shard_number: ShardNumber(6), shard_count: ShardCount(8), }, - NodeId(4), - NodeId(5), + NodeId(2), + NodeId(7), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); @@ -1335,9 +1322,10 @@ mod tests { shard_count: ShardCount(8), }, NodeId(3), - NodeId(1), + NodeId(8), &mut scheduled_shards, &mut scheduler, + Some(az_a.clone()), &mut context, ); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 77615eadc9..4d195093d7 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -29,7 +29,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, - scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, + scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, ScheduleOptimizationAction, @@ -1576,6 +1576,7 @@ impl Service { attach_req.tenant_shard_id, ShardIdentity::unsharded(), PlacementPolicy::Attached(0), + None, ), ); tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id); @@ -2103,6 +2104,21 @@ impl Service { ) }; + let preferred_az_id: Option = { + let mut locked = self.inner.write().unwrap(); + + // Idempotency: take the existing value if the tenant already exists + if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) { + shard.preferred_az().cloned() + } else { + locked + .scheduler + .schedule_shard::(&[], &None, &ScheduleContext::default()) + .ok() + .and_then(|n_id| locked.scheduler.get_node_az(&n_id)) + } + }; + // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart // during the creation, rather than risking leaving orphan objects in S3. @@ -2122,7 +2138,7 @@ impl Service { splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default()) .unwrap(), - preferred_az_id: None, + preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()), }) .collect(); @@ -2158,6 +2174,7 @@ impl Service { &create_req.shard_parameters, create_req.config.clone(), placement_policy.clone(), + preferred_az_id.as_ref(), &mut schedule_context, ) .await; @@ -2171,44 +2188,6 @@ impl Service { } } - let preferred_azs = { - let locked = self.inner.read().unwrap(); - response_shards - .iter() - .filter_map(|resp| { - let az_id = locked - .nodes - .get(&resp.node_id) - .map(|n| n.get_availability_zone_id().clone())?; - - Some((resp.shard_id, az_id)) - }) - .collect::>() - }; - - // Note that we persist the preferred AZ for the new shards separately. - // In theory, we could "peek" the scheduler to determine where the shard will - // land, but the subsequent "real" call into the scheduler might select a different - // node. Hence, we do this awkward update to keep things consistent. - let updated = self - .persistence - .set_tenant_shard_preferred_azs(preferred_azs) - .await - .map_err(|err| { - ApiError::InternalServerError(anyhow::anyhow!( - "Failed to persist preferred az ids: {err}" - )) - })?; - - { - let mut locked = self.inner.write().unwrap(); - for (tid, az_id) in updated { - if let Some(shard) = locked.tenants.get_mut(&tid) { - shard.set_preferred_az(az_id); - } - } - } - // If we failed to schedule shards, then they are still created in the controller, // but we return an error to the requester to avoid a silent failure when someone // tries to e.g. create a tenant whose placement policy requires more nodes than @@ -2239,6 +2218,7 @@ impl Service { /// Helper for tenant creation that does the scheduling for an individual shard. Covers both the /// case of a new tenant and a pre-existing one. + #[allow(clippy::too_many_arguments)] async fn do_initial_shard_scheduling( &self, tenant_shard_id: TenantShardId, @@ -2246,6 +2226,7 @@ impl Service { shard_params: &ShardParameters, config: TenantConfig, placement_policy: PlacementPolicy, + preferred_az_id: Option<&AvailabilityZone>, schedule_context: &mut ScheduleContext, ) -> InitialShardScheduleOutcome { let mut locked = self.inner.write().unwrap(); @@ -2283,6 +2264,7 @@ impl Service { tenant_shard_id, ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params), placement_policy, + preferred_az_id.cloned(), )); state.generation = initial_generation; @@ -4149,16 +4131,14 @@ impl Service { }, ); - let mut child_state = TenantShard::new(child, child_shard, policy.clone()); + let mut child_state = + TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone()); child_state.intent = IntentState::single(scheduler, Some(pageserver)); child_state.observed = ObservedState { locations: child_observed, }; child_state.generation = Some(generation); child_state.config = config.clone(); - if let Some(preferred_az) = &preferred_az { - child_state.set_preferred_az(preferred_az.clone()); - } // The child's TenantShard::splitting is intentionally left at the default value of Idle, // as at this point in the split process we have succeeded and this part is infallible: @@ -5351,7 +5331,7 @@ impl Service { register_req.listen_http_port, register_req.listen_pg_addr, register_req.listen_pg_port, - register_req.availability_zone_id, + register_req.availability_zone_id.clone(), ); // TODO: idempotency if the node already exists in the database @@ -5371,8 +5351,9 @@ impl Service { .set(locked.nodes.len() as i64); tracing::info!( - "Registered pageserver {}, now have {} pageservers", + "Registered pageserver {} ({}), now have {} pageservers", register_req.node_id, + register_req.availability_zone_id, locked.nodes.len() ); Ok(()) diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 3a7d0e7cdb..2c445d6835 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -313,6 +313,7 @@ pub(crate) struct ObservedStateLocation { /// we know that we might have some state on this node. pub(crate) conf: Option, } + pub(crate) struct ReconcilerWaiter { // For observability purposes, remember the ID of the shard we're // waiting for. @@ -474,6 +475,7 @@ impl TenantShard { tenant_shard_id: TenantShardId, shard: ShardIdentity, policy: PlacementPolicy, + preferred_az_id: Option, ) -> Self { metrics::METRICS_REGISTRY .metrics_group @@ -497,7 +499,7 @@ impl TenantShard { last_error: Arc::default(), pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), - preferred_az_id: None, + preferred_az_id, } } @@ -734,7 +736,15 @@ impl TenantShard { // other shards from the same tenant on it, then skip doing any scheduling calculations. let attached = (*self.intent.get_attached())?; - let schedule_context = schedule_context.project_detach(attached); + tracing::info!( + "Initially: attached {attached} ({:?} vs {:?}), in context {schedule_context:?}", + scheduler.get_node_az(&attached), + self.preferred_az_id.as_ref() + ); + + // Construct a schedule context that excludes locations belonging to + // this shard: this simulates removing and re-scheduling the shard + let schedule_context = schedule_context.project_detach(self); // Look for a lower-scoring location to attach to let Ok(candidate_node) = scheduler.schedule_shard::( @@ -789,16 +799,16 @@ impl TenantShard { // migrating for utilization requires a separate high level view of the system to // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily // moving things around in the order that we hit this function. - let candidate_score = candidate_score.disregard_utilization(); - let candidate_score = candidate_score.project_attachment(); + let candidate_score = candidate_score.for_optimization(); + let current_score = current_score.for_optimization(); - let current_score = current_score.disregard_utilization(); - - if candidate_score < current_score && current_score.different(&candidate_score) { + if candidate_score < current_score { tracing::info!("Found a lower scoring location! {candidate_score:?} is better than {current_score:?}"); } else { // The candidate node is no better than our current location, so don't migrate - tracing::debug!("Candidate node {candidate_node} is no better than our current location {attached}"); + tracing::debug!( + "Candidate node {candidate_node} is no better than our current location {attached} (candidate {candidate_score:?} vs attached {current_score:?})", + ); return None; } } @@ -947,14 +957,14 @@ impl TenantShard { ) { Some(current_score) => { // Disregard utilization: we don't want to thrash around based on disk utilization - let current_score = current_score.disregard_utilization(); - let candidate_score = candidate_score.disregard_utilization(); + let current_score = current_score.for_optimization(); + let candidate_score = candidate_score.for_optimization(); - if candidate_score < current_score && current_score.different(&candidate_score) - { + if candidate_score < current_score { tracing::info!( - "Identified optimization({}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ", + "Identified optimization({}, home AZ {:?}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ", self.tenant_shard_id, + self.preferred_az_id, self.intent.get_secondary(), candidate_score, current_score @@ -1677,6 +1687,7 @@ pub(crate) mod tests { ) .unwrap(), policy, + None, ) } @@ -1712,6 +1723,7 @@ pub(crate) mod tests { ) .unwrap(), policy.clone(), + None, ); if let Some(az) = &preferred_az { @@ -1857,18 +1869,27 @@ pub(crate) mod tests { #[test] /// Simple case: moving attachment to somewhere better where we already have a secondary fn optimize_attachment_simple() -> anyhow::Result<()> { - let nodes = make_test_nodes(3, &[]); + let nodes = make_test_nodes( + 3, + &[ + AvailabilityZone("az-a".to_string()), + AvailabilityZone("az-b".to_string()), + AvailabilityZone("az-c".to_string()), + ], + ); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1)); + shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string())); // Initially: both nodes attached on shard 1, and both have secondary locations // on different nodes. - shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1))); - shard_a.intent.push_secondary(&mut scheduler, NodeId(2)); + shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2))); + shard_a.intent.push_secondary(&mut scheduler, NodeId(1)); shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1))); - shard_b.intent.push_secondary(&mut scheduler, NodeId(3)); + shard_b.intent.push_secondary(&mut scheduler, NodeId(2)); fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext { let mut schedule_context = ScheduleContext::default(); @@ -1886,8 +1907,8 @@ pub(crate) mod tests { Some(ScheduleOptimization { sequence: shard_a.sequence, action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { - old_attached_node_id: NodeId(1), - new_attached_node_id: NodeId(2) + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) }) }) ); @@ -2020,6 +2041,87 @@ pub(crate) mod tests { Ok(()) } + #[test] + /// Check that multi-step migration works when moving to somewhere that is only better by + /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary + /// counting toward the affinity score such that it prevents the rest of the migration from happening. + fn optimize_attachment_marginal() -> anyhow::Result<()> { + let nodes = make_test_nodes(2, &[]); + let mut scheduler = Scheduler::new(nodes.values()); + + // Multi-sharded tenant, we will craft a situation where affinity + // scores differ only slightly + let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); + + // 1 attached on node 1 + shards[0] + .intent + .set_attached(&mut scheduler, Some(NodeId(1))); + // 2 attached and one secondary on node 2 + shards[1] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + shards[2] + .intent + .set_attached(&mut scheduler, Some(NodeId(2))); + shards[1].intent.push_secondary(&mut scheduler, NodeId(2)); + + fn make_schedule_context(shards: &Vec) -> ScheduleContext { + let mut schedule_context = ScheduleContext::default(); + for shard in shards { + schedule_context.avoid(&shard.intent.all_pageservers()); + if let Some(attached) = shard.intent.get_attached() { + schedule_context.push_attached(*attached); + } + } + schedule_context + } + + let schedule_context = make_schedule_context(&shards); + let optimization_a_prepare = + shards[2].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_prepare, + Some(ScheduleOptimization { + sequence: shards[2].sequence, + action: ScheduleOptimizationAction::CreateSecondary(NodeId(1)) + }) + ); + shards[2].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap()); + + let schedule_context = make_schedule_context(&shards); + let optimization_a_migrate = + shards[2].optimize_attachment(&mut scheduler, &schedule_context); + assert_eq!( + optimization_a_migrate, + Some(ScheduleOptimization { + sequence: shards[2].sequence, + action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment { + old_attached_node_id: NodeId(2), + new_attached_node_id: NodeId(1) + }) + }) + ); + shards[2].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap()); + + // let schedule_context = make_schedule_context(&shard_a, &shard_b); + // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context); + // assert_eq!( + // optimization_a_cleanup, + // Some(ScheduleOptimization { + // sequence: shard_a.sequence, + // action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1)) + // }) + // ); + // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap()); + + for mut shard in shards { + shard.intent.clear(&mut scheduler); + } + + Ok(()) + } + #[test] fn optimize_secondary() -> anyhow::Result<()> { let nodes = make_test_nodes(4, &[]);