From 2cf47b1477d281a868deab4914aceb53e37a22e9 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 25 Sep 2024 14:31:04 +0100 Subject: [PATCH] storcon: do az aware scheduling (#9083) ## Problem Storage controller didn't previously consider AZ locality between compute and pageservers when scheduling nodes. Control plane has this feature, and, since we are migrating tenants away from it, we need feature parity to avoid perf degradations. ## Summary of changes The change itself is fairly simple: 1. Thread az info into the scheduler 2. Add an extra member to the scheduling scores Step (2) deserves some more discussion. Let's break it down by the shard type being scheduled: **Attached Shards** We wish for attached shards of a tenant to end up in the preferred AZ of the tenant since that is where the compute is like to be. The AZ member for `NodeAttachmentSchedulingScore` has been placed below the affinity score (so it's got the second biggest weight for picking the node). The rationale for going below the affinity score is to avoid having all shards of a single tenant placed on the same node in 2 node regions, since that would mean that one tenant can drive the general workload of an entire pageserver. I'm not 100% sure this is the right decision, so open to discussing hoisting the AZ up to first place. **Secondary Shards** We wish for secondary shards of a tenant to be scheduled in a different AZ from the preferred one for HA purposes. The AZ member for `NodeSecondarySchedulingScore` has been placed first, so nodes in different AZs from the preferred one will always be considered first. On small clusters, this can mean that all the secondaries of a tenant are scheduled to the same pageserver, but secondaries don't use up as many resources as the attached location, so IMO the argument made for attached shards doesn't hold. Related: https://github.com/neondatabase/neon/issues/8848 --- control_plane/storcon_cli/src/main.rs | 6 +- libs/pageserver_api/src/controller_api.rs | 14 +- pageserver/src/control_plane_client.rs | 6 +- storage_controller/src/node.rs | 16 +- storage_controller/src/persistence.rs | 7 +- storage_controller/src/scheduler.rs | 230 ++++++++++++++++++++-- storage_controller/src/service.rs | 16 +- storage_controller/src/tenant_shard.rs | 215 +++++++++++++++++--- 8 files changed, 445 insertions(+), 65 deletions(-) diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 651fcda8db..73d89699ed 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -4,8 +4,8 @@ use std::{str::FromStr, time::Duration}; use clap::{Parser, Subcommand}; use pageserver_api::{ controller_api::{ - NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, ShardSchedulingPolicy, - TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, + AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse, + ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, }, models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, @@ -339,7 +339,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, - availability_zone_id, + availability_zone_id: AvailabilityZone(availability_zone_id), }), ) .await?; diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 40b7dbbbc2..0ea30ce54f 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::fmt::Display; use std::str::FromStr; use std::time::{Duration, Instant}; @@ -57,7 +58,7 @@ pub struct NodeRegisterRequest { pub listen_http_addr: String, pub listen_http_port: u16, - pub availability_zone_id: String, + pub availability_zone_id: AvailabilityZone, } #[derive(Serialize, Deserialize)] @@ -74,10 +75,19 @@ pub struct TenantPolicyRequest { pub scheduling: Option, } +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct AvailabilityZone(pub String); + +impl Display for AvailabilityZone { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + #[derive(Serialize, Deserialize)] pub struct ShardsPreferredAzsRequest { #[serde(flatten)] - pub preferred_az_ids: HashMap, + pub preferred_az_ids: HashMap, } #[derive(Serialize, Deserialize)] diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index f6d1c35a8c..d0a967b920 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use futures::Future; use pageserver_api::{ - controller_api::NodeRegisterRequest, + controller_api::{AvailabilityZone, NodeRegisterRequest}, shard::TenantShardId, upcall_api::{ ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, @@ -148,10 +148,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { .and_then(|jv| jv.as_str().map(|str| str.to_owned())); match az_id_from_metadata { - Some(az_id) => Some(az_id), + Some(az_id) => Some(AvailabilityZone(az_id)), None => { tracing::warn!("metadata.json does not contain an 'availability_zone_id' field"); - conf.availability_zone.clone() + conf.availability_zone.clone().map(AvailabilityZone) } } }; diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index cb9ce10d23..4cc9b0070d 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration}; use pageserver_api::{ controller_api::{ - NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, - TenantLocateResponseShard, + AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, + NodeSchedulingPolicy, TenantLocateResponseShard, }, shard::TenantShardId, }; @@ -36,7 +36,7 @@ pub(crate) struct Node { listen_pg_addr: String, listen_pg_port: u16, - availability_zone_id: String, + availability_zone_id: AvailabilityZone, // This cancellation token means "stop any RPCs in flight to this node, and don't start // any more". It is not related to process shutdown. @@ -64,8 +64,8 @@ impl Node { } #[allow(unused)] - pub(crate) fn get_availability_zone_id(&self) -> &str { - self.availability_zone_id.as_str() + pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone { + &self.availability_zone_id } pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy { @@ -181,7 +181,7 @@ impl Node { listen_http_port: u16, listen_pg_addr: String, listen_pg_port: u16, - availability_zone_id: String, + availability_zone_id: AvailabilityZone, ) -> Self { Self { id, @@ -204,7 +204,7 @@ impl Node { listen_http_port: self.listen_http_port as i32, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port as i32, - availability_zone_id: self.availability_zone_id.clone(), + availability_zone_id: self.availability_zone_id.0.clone(), } } @@ -219,7 +219,7 @@ impl Node { listen_http_port: np.listen_http_port as u16, listen_pg_addr: np.listen_pg_addr, listen_pg_port: np.listen_pg_port as u16, - availability_zone_id: np.availability_zone_id, + availability_zone_id: AvailabilityZone(np.availability_zone_id), cancel: CancellationToken::new(), } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 1dc1040d96..14cc51240d 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -9,6 +9,7 @@ use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::Connection; use itertools::Itertools; +use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::controller_api::MetadataHealthRecord; use pageserver_api::controller_api::ShardSchedulingPolicy; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; @@ -667,8 +668,8 @@ impl Persistence { pub(crate) async fn set_tenant_shard_preferred_azs( &self, - preferred_azs: Vec<(TenantShardId, String)>, - ) -> DatabaseResult> { + preferred_azs: Vec<(TenantShardId, AvailabilityZone)>, + ) -> DatabaseResult> { use crate::schema::tenant_shards::dsl::*; self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| { @@ -679,7 +680,7 @@ impl Persistence { .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) - .set(preferred_az_id.eq(preferred_az)) + .set(preferred_az_id.eq(preferred_az.0.clone())) .execute(conn)?; if updated == 1 { diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 1cb1fb104d..2414d95eb8 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -1,6 +1,6 @@ use crate::{node::Node, tenant_shard::TenantShard}; use itertools::Itertools; -use pageserver_api::models::PageserverUtilization; +use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization}; use serde::Serialize; use std::{collections::HashMap, fmt::Debug}; use utils::{http::error::ApiError, id::NodeId}; @@ -32,6 +32,8 @@ pub(crate) struct SchedulerNode { shard_count: usize, /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. attached_shard_count: usize, + /// Availability zone id in which the node resides + az: AvailabilityZone, /// Whether this node is currently elegible to have new shards scheduled (this is derived /// from a node's availability state and scheduling policy). @@ -42,6 +44,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option; fn is_overloaded(&self) -> bool; @@ -62,6 +65,72 @@ impl ShardTag for SecondaryShardTag { type Score = NodeSecondarySchedulingScore; } +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +enum AzMatch { + Yes, + No, + Unknown, +} + +impl AzMatch { + fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self { + match shard_preferred_az { + Some(preferred_az) if preferred_az == node_az => Self::Yes, + Some(_preferred_az) => Self::No, + None => Self::Unknown, + } + } +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +struct AttachmentAzMatch(AzMatch); + +impl Ord for AttachmentAzMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower scores indicate a more suitable node. + // Note that we prefer a node for which we don't have + // info to a node which we are certain doesn't match the + // preferred AZ of the shard. + let az_match_score = |az_match: &AzMatch| match az_match { + AzMatch::Yes => 0, + AzMatch::Unknown => 1, + AzMatch::No => 2, + }; + + az_match_score(&self.0).cmp(&az_match_score(&other.0)) + } +} + +impl PartialOrd for AttachmentAzMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +struct SecondaryAzMatch(AzMatch); + +impl Ord for SecondaryAzMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower scores indicate a more suitable node. + // For secondary locations we wish to avoid the preferred AZ + // of the shard. + let az_match_score = |az_match: &AzMatch| match az_match { + AzMatch::No => 0, + AzMatch::Unknown => 1, + AzMatch::Yes => 2, + }; + + az_match_score(&self.0).cmp(&az_match_score(&other.0)) + } +} + +impl PartialOrd for SecondaryAzMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Scheduling score of a given node for shard attachments. /// Lower scores indicate more suitable nodes. /// Ordering is given by member declaration order (top to bottom). @@ -70,6 +139,10 @@ pub(crate) struct NodeAttachmentSchedulingScore { /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, + /// Flag indicating whether this node matches the preferred AZ + /// of the shard. For equal affinity scores, nodes in the matching AZ + /// are considered first. + az_match: AttachmentAzMatch, /// Size of [`ScheduleContext::attached_nodes`] for the current node. /// This normally tracks the number of attached shards belonging to the /// tenant being scheduled that are already on this node. @@ -87,6 +160,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option { let utilization = match &mut node.may_schedule { @@ -102,6 +176,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { .get(node_id) .copied() .unwrap_or(AffinityScore::FREE), + az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0), utilization_score: utilization.cached_score(), total_attached_shard_count: node.attached_shard_count, @@ -123,6 +198,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore { /// Ordering is given by member declaration order (top to bottom). #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub(crate) struct NodeSecondarySchedulingScore { + /// Flag indicating whether this node matches the preferred AZ + /// of the shard. For secondary locations we wish to avoid nodes in. + /// the preferred AZ of the shard, since that's where the attached location + /// should be scheduled and having the secondary in the same AZ is bad for HA. + az_match: SecondaryAzMatch, /// The number of shards belonging to the tenant currently being /// scheduled that are attached to this node. affinity_score: AffinityScore, @@ -139,6 +219,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { fn generate( node_id: &NodeId, node: &mut SchedulerNode, + preferred_az: &Option, context: &ScheduleContext, ) -> Option { let utilization = match &mut node.may_schedule { @@ -149,6 +230,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore { }; Some(Self { + az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())), affinity_score: context .nodes .get(node_id) @@ -179,6 +261,7 @@ impl PartialEq for SchedulerNode { may_schedule_matches && self.shard_count == other.shard_count && self.attached_shard_count == other.attached_shard_count + && self.az == other.az } } @@ -293,6 +376,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }, ); } @@ -319,6 +403,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }, ); } @@ -497,6 +582,7 @@ impl Scheduler { shard_count: 0, attached_shard_count: 0, may_schedule: node.may_schedule(), + az: node.get_availability_zone_id().clone(), }); } } @@ -542,6 +628,7 @@ impl Scheduler { fn compute_node_scores( &mut self, hard_exclude: &[NodeId], + preferred_az: &Option, context: &ScheduleContext, ) -> Vec where @@ -553,7 +640,7 @@ impl Scheduler { if hard_exclude.contains(k) { None } else { - Score::generate(k, v, context) + Score::generate(k, v, preferred_az, context) } }) .collect() @@ -571,13 +658,15 @@ impl Scheduler { pub(crate) fn schedule_shard( &mut self, hard_exclude: &[NodeId], + preferred_az: &Option, context: &ScheduleContext, ) -> Result { if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); } - let mut scores = self.compute_node_scores::(hard_exclude, context); + let mut scores = + self.compute_node_scores::(hard_exclude, preferred_az, context); // Exclude nodes whose utilization is critically high, if there are alternatives available. This will // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example @@ -634,6 +723,12 @@ impl Scheduler { Ok(node_id) } + /// Selects any available node. This is suitable for performing background work (e.g. S3 + /// deletions). + pub(crate) fn any_available_node(&mut self) -> Result { + self.schedule_shard::(&[], &None, &ScheduleContext::default()) + } + /// Unit test access to internal state #[cfg(test)] pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize { @@ -650,13 +745,22 @@ impl Scheduler { pub(crate) mod test_utils { use crate::node::Node; - use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; + use pageserver_api::{ + controller_api::{AvailabilityZone, NodeAvailability}, + models::utilization::test_utilization, + }; use std::collections::HashMap; use utils::id::NodeId; + /// Test helper: synthesize the requested number of nodes, all in active state. /// /// Node IDs start at one. - pub(crate) fn make_test_nodes(n: u64) -> HashMap { + /// + /// The `azs` argument specifies the list of availability zones which will be assigned + /// to nodes in round-robin fashion. If empy, a default AZ is assigned. + pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap { + let mut az_iter = azs.iter().cycle(); + (1..n + 1) .map(|i| { (NodeId(i), { @@ -666,7 +770,10 @@ pub(crate) mod test_utils { 80 + i as u16, format!("pghost-{i}"), 5432 + i as u16, - "test-az".to_string(), + az_iter + .next() + .cloned() + .unwrap_or(AvailabilityZone("test-az".to_string())), ); node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0))); assert!(node.is_available()); @@ -686,7 +793,7 @@ mod tests { use crate::tenant_shard::IntentState; #[test] fn scheduler_basic() -> anyhow::Result<()> { - let nodes = test_utils::make_test_nodes(2); + let nodes = test_utils::make_test_nodes(2, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut t1_intent = IntentState::new(); @@ -694,9 +801,9 @@ mod tests { let context = ScheduleContext::default(); - let scheduled = scheduler.schedule_shard::(&[], &context)?; + let scheduled = scheduler.schedule_shard::(&[], &None, &context)?; t1_intent.set_attached(&mut scheduler, Some(scheduled)); - let scheduled = scheduler.schedule_shard::(&[], &context)?; + let scheduled = scheduler.schedule_shard::(&[], &None, &context)?; t2_intent.set_attached(&mut scheduler, Some(scheduled)); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); @@ -705,8 +812,11 @@ mod tests { assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1); assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1); - let scheduled = - scheduler.schedule_shard::(&t1_intent.all_pageservers(), &context)?; + let scheduled = scheduler.schedule_shard::( + &t1_intent.all_pageservers(), + &None, + &context, + )?; t1_intent.push_secondary(&mut scheduler, scheduled); assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1); @@ -746,7 +856,7 @@ mod tests { #[test] /// Test the PageserverUtilization's contribution to scheduling algorithm fn scheduler_utilization() { - let mut nodes = test_utils::make_test_nodes(3); + let mut nodes = test_utils::make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); // Need to keep these alive because they contribute to shard counts via RAII @@ -761,7 +871,7 @@ mod tests { context: &ScheduleContext, ) { let scheduled = scheduler - .schedule_shard::(&[], context) + .schedule_shard::(&[], &None, context) .unwrap(); let mut intent = IntentState::new(); intent.set_attached(scheduler, Some(scheduled)); @@ -870,4 +980,98 @@ mod tests { intent.clear(&mut scheduler); } } + + #[test] + /// A simple test that showcases AZ-aware scheduling and its interaction with + /// affinity scores. + fn az_scheduling() { + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + + let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]); + let mut scheduler = Scheduler::new(nodes.values()); + + // Need to keep these alive because they contribute to shard counts via RAII + let mut scheduled_intents = Vec::new(); + + let mut context = ScheduleContext::default(); + + fn assert_scheduler_chooses( + expect_node: NodeId, + preferred_az: Option, + scheduled_intents: &mut Vec, + scheduler: &mut Scheduler, + context: &mut ScheduleContext, + ) { + let scheduled = scheduler + .schedule_shard::(&[], &preferred_az, context) + .unwrap(); + let mut intent = IntentState::new(); + intent.set_attached(scheduler, Some(scheduled)); + scheduled_intents.push(intent); + assert_eq!(scheduled, expect_node); + + context.avoid(&[scheduled]); + } + + assert_scheduler_chooses::( + NodeId(1), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Node 2 and 3 have affinity score equal to 0, but node 3 + // is in "az-a" so we prefer that. + assert_scheduler_chooses::( + NodeId(3), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Node 2 is not in "az-a", but it has the lowest affinity so we prefer that. + assert_scheduler_chooses::( + NodeId(2), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-a" for the secondary location. + assert_scheduler_chooses::( + NodeId(2), + Some(az_a_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-b" for the secondary location. + // Nodes 1 and 3 are identically loaded, so prefer the lowest node id. + assert_scheduler_chooses::( + NodeId(1), + Some(az_b_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + // Avoid nodes in "az-b" for the secondary location. + // Node 3 has lower affinity score than 1, so prefer that. + assert_scheduler_chooses::( + NodeId(3), + Some(az_b_tag.clone()), + &mut scheduled_intents, + &mut scheduler, + &mut context, + ); + + for mut intent in scheduled_intents { + intent.clear(&mut scheduler); + } + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 5555505b81..6a11e9650c 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -26,7 +26,7 @@ use crate::{ ShardGenerationState, TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, - scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, ScheduleOptimizationAction, @@ -1265,6 +1265,8 @@ impl Service { #[cfg(feature = "testing")] { + use pageserver_api::controller_api::AvailabilityZone; + // Hack: insert scheduler state for all nodes referenced by shards, as compatibility // tests only store the shards, not the nodes. The nodes will be loaded shortly // after when pageservers start up and register. @@ -1282,7 +1284,7 @@ impl Service { 123, "".to_string(), 123, - "test_az".to_string(), + AvailabilityZone("test_az".to_string()), ); scheduler.node_upsert(&node); @@ -2099,7 +2101,7 @@ impl Service { let az_id = locked .nodes .get(&resp.node_id) - .map(|n| n.get_availability_zone_id().to_string())?; + .map(|n| n.get_availability_zone_id().clone())?; Some((resp.shard_id, az_id)) }) @@ -2629,8 +2631,7 @@ impl Service { let scheduler = &mut locked.scheduler; // Right now we only perform the operation on a single node without parallelization // TODO fan out the operation to multiple nodes for better performance - let node_id = - scheduler.schedule_shard::(&[], &ScheduleContext::default())?; + let node_id = scheduler.any_available_node()?; let node = locked .nodes .get(&node_id) @@ -2816,8 +2817,7 @@ impl Service { // Pick an arbitrary node to use for remote deletions (does not have to be where the tenant // was attached, just has to be able to see the S3 content) - let node_id = - scheduler.schedule_shard::(&[], &ScheduleContext::default())?; + let node_id = scheduler.any_available_node()?; let node = nodes .get(&node_id) .expect("Pageservers may not be deleted while lock is active"); @@ -4481,7 +4481,7 @@ impl Service { let az_id = locked .nodes .get(node_id) - .map(|n| n.get_availability_zone_id().to_string())?; + .map(|n| n.get_availability_zone_id().clone())?; Some((*tid, az_id)) }) diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index eccde0e3ab..afc89eae00 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -15,7 +15,7 @@ use crate::{ service::ReconcileResultRequest, }; use pageserver_api::controller_api::{ - NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, + AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, }; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, @@ -146,7 +146,7 @@ pub(crate) struct TenantShard { // We should attempt to schedule this shard in the provided AZ to // decrease chances of cross-AZ compute. - preferred_az_id: Option, + preferred_az_id: Option, } #[derive(Default, Clone, Debug, Serialize)] @@ -540,8 +540,11 @@ impl TenantShard { Ok((true, promote_secondary)) } else { // Pick a fresh node: either we had no secondaries or none were schedulable - let node_id = - scheduler.schedule_shard::(&self.intent.secondary, context)?; + let node_id = scheduler.schedule_shard::( + &self.intent.secondary, + &self.preferred_az_id, + context, + )?; tracing::debug!("Selected {} as attached", node_id); self.intent.set_attached(scheduler, Some(node_id)); Ok((true, node_id)) @@ -622,8 +625,11 @@ impl TenantShard { let mut used_pageservers = vec![attached_node_id]; while self.intent.secondary.len() < secondary_count { - let node_id = scheduler - .schedule_shard::(&used_pageservers, context)?; + let node_id = scheduler.schedule_shard::( + &used_pageservers, + &self.preferred_az_id, + context, + )?; self.intent.push_secondary(scheduler, node_id); used_pageservers.push(node_id); modified = true; @@ -636,7 +642,11 @@ impl TenantShard { modified = true; } else if self.intent.secondary.is_empty() { // Populate secondary by scheduling a fresh node - let node_id = scheduler.schedule_shard::(&[], context)?; + let node_id = scheduler.schedule_shard::( + &[], + &self.preferred_az_id, + context, + )?; self.intent.push_secondary(scheduler, node_id); modified = true; } @@ -815,6 +825,7 @@ impl TenantShard { // with lower utilization. let Ok(candidate_node) = scheduler.schedule_shard::( &self.intent.all_pageservers(), + &self.preferred_az_id, schedule_context, ) else { // A scheduling error means we have no possible candidate replacements @@ -1313,7 +1324,7 @@ impl TenantShard { pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), - preferred_az_id: tsp.preferred_az_id, + preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone), }) } @@ -1329,15 +1340,15 @@ impl TenantShard { config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(), scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(), - preferred_az_id: self.preferred_az_id.clone(), + preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()), } } - pub(crate) fn preferred_az(&self) -> Option<&str> { - self.preferred_az_id.as_deref() + pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> { + self.preferred_az_id.as_ref() } - pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) { + pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) { self.preferred_az_id = Some(preferred_az_id); } } @@ -1350,6 +1361,7 @@ pub(crate) mod tests { controller_api::NodeAvailability, shard::{ShardCount, ShardNumber}, }; + use rand::{rngs::StdRng, SeedableRng}; use utils::id::TenantId; use crate::scheduler::test_utils::make_test_nodes; @@ -1378,7 +1390,11 @@ pub(crate) mod tests { ) } - fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec { + fn make_test_tenant( + policy: PlacementPolicy, + shard_count: ShardCount, + preferred_az: Option, + ) -> Vec { let tenant_id = TenantId::generate(); (0..shard_count.count()) @@ -1390,7 +1406,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - TenantShard::new( + let mut ts = TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1399,7 +1415,13 @@ pub(crate) mod tests { ) .unwrap(), policy.clone(), - ) + ); + + if let Some(az) = &preferred_az { + ts.set_preferred_az(az.clone()); + } + + ts }) .collect() } @@ -1410,7 +1432,7 @@ pub(crate) mod tests { fn tenant_ha_scheduling() -> anyhow::Result<()> { // Start with three nodes. Our tenant will only use two. The third one is // expected to remain unused. - let mut nodes = make_test_nodes(3); + let mut nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut context = ScheduleContext::default(); @@ -1462,7 +1484,7 @@ pub(crate) mod tests { #[test] fn intent_from_observed() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1512,7 +1534,7 @@ pub(crate) mod tests { #[test] fn scheduling_mode() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1537,7 +1559,7 @@ pub(crate) mod tests { #[test] fn optimize_attachment() -> anyhow::Result<()> { - let nodes = make_test_nodes(3); + let nodes = make_test_nodes(3, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1604,7 +1626,7 @@ pub(crate) mod tests { #[test] fn optimize_secondary() -> anyhow::Result<()> { - let nodes = make_test_nodes(4); + let nodes = make_test_nodes(4, &[]); let mut scheduler = Scheduler::new(nodes.values()); let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1)); @@ -1703,14 +1725,14 @@ pub(crate) mod tests { /// that it converges. #[test] fn optimize_add_nodes() -> anyhow::Result<()> { - let nodes = make_test_nodes(4); + let nodes = make_test_nodes(4, &[]); // Only show the scheduler a couple of nodes let mut scheduler = Scheduler::new([].iter()); scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); - let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let mut schedule_context = ScheduleContext::default(); for shard in &mut shards { assert!(shard @@ -1759,16 +1781,16 @@ pub(crate) mod tests { fn initial_scheduling_is_optimal() -> anyhow::Result<()> { use itertools::Itertools; - let nodes = make_test_nodes(2); + let nodes = make_test_nodes(2, &[]); let mut scheduler = Scheduler::new([].iter()); scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap()); - let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let a_context = Rc::new(RefCell::new(ScheduleContext::default())); - let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4)); + let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None); let b_context = Rc::new(RefCell::new(ScheduleContext::default())); let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone())); @@ -1793,4 +1815,147 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn random_az_shard_scheduling() -> anyhow::Result<()> { + use rand::seq::SliceRandom; + + for seed in 0..50 { + eprintln!("Running test with seed {seed}"); + let mut rng = StdRng::seed_from_u64(seed); + + let az_a_tag = AvailabilityZone("az-a".to_string()); + let az_b_tag = AvailabilityZone("az-b".to_string()); + let azs = [az_a_tag, az_b_tag]; + let nodes = make_test_nodes(4, &azs); + let mut shards_per_az: HashMap = HashMap::new(); + + let mut scheduler = Scheduler::new([].iter()); + for node in nodes.values() { + scheduler.node_upsert(node); + } + + let mut shards = Vec::default(); + let mut contexts = Vec::default(); + let mut az_picker = azs.iter().cycle().cloned(); + for i in 0..100 { + let az = az_picker.next().unwrap(); + let shard_count = i % 4 + 1; + *shards_per_az.entry(az.clone()).or_default() += shard_count; + + let tenant_shards = make_test_tenant( + PlacementPolicy::Attached(1), + ShardCount::new(shard_count.try_into().unwrap()), + Some(az), + ); + let context = Rc::new(RefCell::new(ScheduleContext::default())); + + contexts.push(context.clone()); + let with_ctx = tenant_shards + .into_iter() + .map(|shard| (shard, context.clone())); + for shard_with_ctx in with_ctx { + shards.push(shard_with_ctx); + } + } + + shards.shuffle(&mut rng); + + #[derive(Default, Debug)] + struct NodeStats { + attachments: u32, + secondaries: u32, + } + + let mut node_stats: HashMap = HashMap::default(); + let mut attachments_in_wrong_az = 0; + let mut secondaries_in_wrong_az = 0; + + for (shard, context) in &mut shards { + let context = &mut *context.borrow_mut(); + shard.schedule(&mut scheduler, context).unwrap(); + + let attached_node = shard.intent.get_attached().unwrap(); + let stats = node_stats.entry(attached_node).or_default(); + stats.attachments += 1; + + let secondary_node = *shard.intent.get_secondary().first().unwrap(); + let stats = node_stats.entry(secondary_node).or_default(); + stats.secondaries += 1; + + let attached_node_az = nodes + .get(&attached_node) + .unwrap() + .get_availability_zone_id(); + let secondary_node_az = nodes + .get(&secondary_node) + .unwrap() + .get_availability_zone_id(); + let preferred_az = shard.preferred_az().unwrap(); + + if attached_node_az != preferred_az { + eprintln!( + "{} attachment was scheduled in AZ {} but preferred AZ {}", + shard.tenant_shard_id, attached_node_az, preferred_az + ); + attachments_in_wrong_az += 1; + } + + if secondary_node_az == preferred_az { + eprintln!( + "{} secondary was scheduled in AZ {} which matches preference", + shard.tenant_shard_id, attached_node_az + ); + secondaries_in_wrong_az += 1; + } + } + + let mut violations = Vec::default(); + + if attachments_in_wrong_az > 0 { + violations.push(format!( + "{} attachments scheduled to the incorrect AZ", + attachments_in_wrong_az + )); + } + + if secondaries_in_wrong_az > 0 { + violations.push(format!( + "{} secondaries scheduled to the incorrect AZ", + secondaries_in_wrong_az + )); + } + + eprintln!( + "attachments_in_wrong_az={} secondaries_in_wrong_az={}", + attachments_in_wrong_az, secondaries_in_wrong_az + ); + + for (node_id, stats) in &node_stats { + let node_az = nodes.get(node_id).unwrap().get_availability_zone_id(); + let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2; + let allowed_attachment_load = + (ideal_attachment_load - 1)..(ideal_attachment_load + 2); + + if !allowed_attachment_load.contains(&stats.attachments) { + violations.push(format!( + "Found {} attachments on node {}, but expected {}", + stats.attachments, node_id, ideal_attachment_load + )); + } + + eprintln!( + "{}: attachments={} secondaries={} ideal_attachment_load={}", + node_id, stats.attachments, stats.secondaries, ideal_attachment_load + ); + } + + assert!(violations.is_empty(), "{violations:?}"); + + for (mut shard, _ctx) in shards { + shard.intent.clear(&mut scheduler); + } + } + Ok(()) + } }