diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index a50707a1b8..a9a57d77ce 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -8,6 +8,7 @@ use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId}; +use crate::models::PageserverUtilization; use crate::{ models::{ShardParameters, TenantConfig}, shard::{ShardStripeSize, TenantShardId}, @@ -140,23 +141,11 @@ pub struct TenantShardMigrateRequest { pub node_id: NodeId, } -/// Utilisation score indicating how good a candidate a pageserver -/// is for scheduling the next tenant. See [`crate::models::PageserverUtilization`]. -/// Lower values are better. -#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)] -pub struct UtilizationScore(pub u64); - -impl UtilizationScore { - pub fn worst() -> Self { - UtilizationScore(u64::MAX) - } -} - -#[derive(Serialize, Clone, Copy, Debug)] +#[derive(Serialize, Clone, Debug)] #[serde(into = "NodeAvailabilityWrapper")] pub enum NodeAvailability { // Normal, happy state - Active(UtilizationScore), + Active(PageserverUtilization), // Node is warming up, but we expect it to become available soon. Covers // the time span between the re-attach response being composed on the storage controller // and the first successful heartbeat after the processing of the re-attach response @@ -195,7 +184,9 @@ impl From for NodeAvailability { match val { // Assume the worst utilisation score to begin with. It will later be updated by // the heartbeats. - NodeAvailabilityWrapper::Active => NodeAvailability::Active(UtilizationScore::worst()), + NodeAvailabilityWrapper::Active => { + NodeAvailability::Active(PageserverUtilization::full()) + } NodeAvailabilityWrapper::WarmingUp => NodeAvailability::WarmingUp(Instant::now()), NodeAvailabilityWrapper::Offline => NodeAvailability::Offline, } diff --git a/libs/pageserver_api/src/models/utilization.rs b/libs/pageserver_api/src/models/utilization.rs index 0fec221276..844a0cda5d 100644 --- a/libs/pageserver_api/src/models/utilization.rs +++ b/libs/pageserver_api/src/models/utilization.rs @@ -38,7 +38,7 @@ pub struct PageserverUtilization { pub max_shard_count: u32, /// Cached result of [`Self::score`] - pub utilization_score: u64, + pub utilization_score: Option, /// When was this snapshot captured, pageserver local time. /// @@ -50,6 +50,8 @@ fn unity_percent() -> Percent { Percent::new(0).unwrap() } +pub type RawScore = u64; + impl PageserverUtilization { const UTILIZATION_FULL: u64 = 1000000; @@ -62,7 +64,7 @@ impl PageserverUtilization { /// - Negative values are forbidden /// - Values over UTILIZATION_FULL indicate an overloaded node, which may show degraded performance due to /// layer eviction. - pub fn score(&self) -> u64 { + pub fn score(&self) -> RawScore { let disk_usable_capacity = ((self.disk_usage_bytes + self.free_space_bytes) * self.disk_usable_pct.get() as u64) / 100; @@ -74,8 +76,30 @@ impl PageserverUtilization { std::cmp::max(disk_utilization_score, shard_utilization_score) } - pub fn refresh_score(&mut self) { - self.utilization_score = self.score(); + pub fn cached_score(&mut self) -> RawScore { + match self.utilization_score { + None => { + let s = self.score(); + self.utilization_score = Some(s); + s + } + Some(s) => s, + } + } + + /// If a node is currently hosting more work than it can comfortably handle. This does not indicate that + /// it will fail, but it is a strong signal that more work should not be added unless there is no alternative. + pub fn is_overloaded(score: RawScore) -> bool { + score >= Self::UTILIZATION_FULL + } + + pub fn adjust_shard_count_max(&mut self, shard_count: u32) { + if self.shard_count < shard_count { + self.shard_count = shard_count; + + // Dirty cache: this will be calculated next time someone retrives the score + self.utilization_score = None; + } } /// A utilization structure that has a full utilization score: use this as a placeholder when @@ -88,7 +112,38 @@ impl PageserverUtilization { disk_usable_pct: Percent::new(100).unwrap(), shard_count: 1, max_shard_count: 1, - utilization_score: Self::UTILIZATION_FULL, + utilization_score: Some(Self::UTILIZATION_FULL), + captured_at: serde_system_time::SystemTime(SystemTime::now()), + } + } +} + +/// Test helper +pub mod test_utilization { + use super::PageserverUtilization; + use std::time::SystemTime; + use utils::{ + serde_percent::Percent, + serde_system_time::{self}, + }; + + // Parameters of the imaginary node used for test utilization instances + const TEST_DISK_SIZE: u64 = 1024 * 1024 * 1024 * 1024; + const TEST_SHARDS_MAX: u32 = 1000; + + /// Unit test helper. Unconditionally compiled because cfg(test) doesn't carry across crates. Do + /// not abuse this function from non-test code. + /// + /// Emulates a node with a 1000 shard limit and a 1TB disk. + pub fn simple(shard_count: u32, disk_wanted_bytes: u64) -> PageserverUtilization { + PageserverUtilization { + disk_usage_bytes: disk_wanted_bytes, + free_space_bytes: TEST_DISK_SIZE - std::cmp::min(disk_wanted_bytes, TEST_DISK_SIZE), + disk_wanted_bytes, + disk_usable_pct: Percent::new(100).unwrap(), + shard_count, + max_shard_count: TEST_SHARDS_MAX, + utilization_score: None, captured_at: serde_system_time::SystemTime(SystemTime::now()), } } @@ -120,7 +175,7 @@ mod tests { disk_usage_bytes: u64::MAX, free_space_bytes: 0, disk_wanted_bytes: u64::MAX, - utilization_score: 13, + utilization_score: Some(13), disk_usable_pct: Percent::new(90).unwrap(), shard_count: 100, max_shard_count: 200, diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 0a1a22b6e8..1f8634df93 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1803,6 +1803,14 @@ pub(crate) static SECONDARY_RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::n .expect("failed to define a metric") }); +pub(crate) static NODE_UTILIZATION_SCORE: Lazy = Lazy::new(|| { + register_uint_gauge!( + "pageserver_utilization_score", + "The utilization score we report to the storage controller for scheduling, where 0 is empty, 1000000 is full, and anything above is considered overloaded", + ) + .expect("failed to define a metric") +}); + pub(crate) static SECONDARY_HEATMAP_TOTAL_SIZE: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_secondary_heatmap_total_size", diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2e19a46ac8..3a7afff211 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3741,13 +3741,21 @@ impl Tenant { /// less than this (via eviction and on-demand downloads), but this function enables /// the Tenant to advertise how much storage it would prefer to have to provide fast I/O /// by keeping important things on local disk. + /// + /// This is a heuristic, not a guarantee: tenants that are long-idle will actually use less + /// than they report here, due to layer eviction. Tenants with many active branches may + /// actually use more than they report here. pub(crate) fn local_storage_wanted(&self) -> u64 { - let mut wanted = 0; let timelines = self.timelines.lock().unwrap(); - for timeline in timelines.values() { - wanted += timeline.metrics.visible_physical_size_gauge.get(); - } - wanted + + // Heuristic: we use the max() of the timelines' visible sizes, rather than the sum. This + // reflects the observation that on tenants with multiple large branches, typically only one + // of them is used actively enough to occupy space on disk. + timelines + .values() + .map(|t| t.metrics.visible_physical_size_gauge.get()) + .max() + .unwrap_or(0) } } diff --git a/pageserver/src/utilization.rs b/pageserver/src/utilization.rs index 3c48c84598..a0223f3bce 100644 --- a/pageserver/src/utilization.rs +++ b/pageserver/src/utilization.rs @@ -9,7 +9,7 @@ use utils::serde_percent::Percent; use pageserver_api::models::PageserverUtilization; -use crate::{config::PageServerConf, tenant::mgr::TenantManager}; +use crate::{config::PageServerConf, metrics::NODE_UTILIZATION_SCORE, tenant::mgr::TenantManager}; pub(crate) fn regenerate( conf: &PageServerConf, @@ -58,13 +58,13 @@ pub(crate) fn regenerate( disk_usable_pct, shard_count, max_shard_count: MAX_SHARDS, - utilization_score: 0, + utilization_score: None, captured_at: utils::serde_system_time::SystemTime(captured_at), }; - doc.refresh_score(); - - // TODO: make utilization_score into a metric + // Initialize `PageserverUtilization::utilization_score` + let score = doc.cached_score(); + NODE_UTILIZATION_SCORE.set(score); Ok(doc) } diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index c0e27bafdb..b7e66d33eb 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -6,10 +6,7 @@ use std::{ }; use tokio_util::sync::CancellationToken; -use pageserver_api::{ - controller_api::{NodeAvailability, UtilizationScore}, - models::PageserverUtilization, -}; +use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization}; use thiserror::Error; use utils::id::NodeId; @@ -147,7 +144,8 @@ impl HeartbeaterTask { // goes through to the pageserver even when the node is marked offline. // This doesn't impact the availability observed by [`crate::service::Service`]. let mut node_clone = node.clone(); - node_clone.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + node_clone + .set_availability(NodeAvailability::Active(PageserverUtilization::full())); async move { let response = node_clone @@ -179,7 +177,7 @@ impl HeartbeaterTask { node.get_availability() { PageserverState::WarmingUp { - started_at: last_seen_at, + started_at: *last_seen_at, } } else { PageserverState::Offline diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index ea765ca123..61a44daca9 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -92,15 +92,15 @@ impl Node { } } - pub(crate) fn get_availability(&self) -> NodeAvailability { - self.availability + pub(crate) fn get_availability(&self) -> &NodeAvailability { + &self.availability } pub(crate) fn set_availability(&mut self, availability: NodeAvailability) { use AvailabilityTransition::*; use NodeAvailability::WarmingUp; - match self.get_availability_transition(availability) { + match self.get_availability_transition(&availability) { ToActive => { // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any // users of previously-cloned copies of the node will still see the old cancellation @@ -115,8 +115,8 @@ impl Node { Unchanged | ToWarmingUpFromOffline => {} } - if let (WarmingUp(crnt), WarmingUp(proposed)) = (self.availability, availability) { - self.availability = WarmingUp(std::cmp::max(crnt, proposed)); + if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) { + self.availability = WarmingUp(std::cmp::max(*crnt, *proposed)); } else { self.availability = availability; } @@ -126,12 +126,12 @@ impl Node { /// into a description of the transition. pub(crate) fn get_availability_transition( &self, - availability: NodeAvailability, + availability: &NodeAvailability, ) -> AvailabilityTransition { use AvailabilityTransition::*; use NodeAvailability::*; - match (self.availability, availability) { + match (&self.availability, availability) { (Offline, Active(_)) => ToActive, (Active(_), Offline) => ToOffline, (Active(_), WarmingUp(_)) => ToWarmingUpFromActive, @@ -153,15 +153,15 @@ impl Node { /// Is this node elegible to have work scheduled onto it? pub(crate) fn may_schedule(&self) -> MaySchedule { - let score = match self.availability { - NodeAvailability::Active(score) => score, + let utilization = match &self.availability { + NodeAvailability::Active(u) => u.clone(), NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No, }; match self.scheduling { - NodeSchedulingPolicy::Active => MaySchedule::Yes(score), + NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization), NodeSchedulingPolicy::Draining => MaySchedule::No, - NodeSchedulingPolicy::Filling => MaySchedule::Yes(score), + NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization), NodeSchedulingPolicy::Pause => MaySchedule::No, NodeSchedulingPolicy::PauseForRestart => MaySchedule::No, } @@ -285,7 +285,7 @@ impl Node { pub(crate) fn describe(&self) -> NodeDescribeResponse { NodeDescribeResponse { id: self.id, - availability: self.availability.into(), + availability: self.availability.clone().into(), scheduling: self.scheduling, listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port, diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 843159010d..060e3cc6ca 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -1,6 +1,6 @@ use crate::{node::Node, tenant_shard::TenantShard}; use itertools::Itertools; -use pageserver_api::controller_api::UtilizationScore; +use pageserver_api::models::PageserverUtilization; use serde::Serialize; use std::collections::HashMap; use utils::{http::error::ApiError, id::NodeId}; @@ -20,9 +20,9 @@ impl From for ApiError { } } -#[derive(Serialize, Eq, PartialEq)] +#[derive(Serialize)] pub enum MaySchedule { - Yes(UtilizationScore), + Yes(PageserverUtilization), No, } @@ -282,6 +282,28 @@ impl Scheduler { node.shard_count -= 1; } } + + // Maybe update PageserverUtilization + match update { + RefCountUpdate::AddSecondary | RefCountUpdate::Attach => { + // Referencing the node: if this takes our shard_count above the utilzation structure's + // shard count, then artifically bump it: this ensures that the scheduler immediately + // recognizes that this node has more work on it, without waiting for the next heartbeat + // to update the utilization. + if let MaySchedule::Yes(utilization) = &mut node.may_schedule { + utilization.adjust_shard_count_max(node.shard_count as u32); + } + } + RefCountUpdate::PromoteSecondary + | RefCountUpdate::Detach + | RefCountUpdate::RemoveSecondary + | RefCountUpdate::DemoteAttached => { + // De-referencing the node: leave the utilization's shard_count at a stale higher + // value until some future heartbeat after we have physically removed this shard + // from the node: this prevents the scheduler over-optimistically trying to schedule + // more work onto the node before earlier detaches are done. + } + } } // Check if the number of shards attached to a given node is lagging below @@ -326,7 +348,18 @@ impl Scheduler { use std::collections::hash_map::Entry::*; match self.nodes.entry(node.get_id()) { Occupied(mut entry) => { - entry.get_mut().may_schedule = node.may_schedule(); + // Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values + // to account for any shards scheduled on the controller but not yet visible to the pageserver. + let mut may_schedule = node.may_schedule(); + match &mut may_schedule { + MaySchedule::Yes(utilization) => { + utilization.adjust_shard_count_max(entry.get().shard_count as u32); + } + MaySchedule::No => { // Nothing to tweak + } + } + + entry.get_mut().may_schedule = may_schedule; } Vacant(entry) => { entry.insert(SchedulerNode { @@ -363,7 +396,7 @@ impl Scheduler { let may_schedule = self .nodes .get(node_id) - .map(|n| n.may_schedule != MaySchedule::No) + .map(|n| !matches!(n.may_schedule, MaySchedule::No)) .unwrap_or(false); (*node_id, may_schedule) }) @@ -383,7 +416,7 @@ impl Scheduler { /// the same tenant on the same node. This is a soft constraint: the context will never /// cause us to fail to schedule a shard. pub(crate) fn schedule_shard( - &self, + &mut self, hard_exclude: &[NodeId], context: &ScheduleContext, ) -> Result { @@ -391,31 +424,41 @@ impl Scheduler { return Err(ScheduleError::NoPageservers); } - let mut scores: Vec<(NodeId, AffinityScore, usize, usize)> = self + let mut scores: Vec<(NodeId, AffinityScore, u64, usize)> = self .nodes - .iter() - .filter_map(|(k, v)| { - if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No { - None - } else { - Some(( - *k, - context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE), - v.shard_count, - v.attached_shard_count, - )) - } + .iter_mut() + .filter_map(|(k, v)| match &mut v.may_schedule { + MaySchedule::No => None, + MaySchedule::Yes(_) if hard_exclude.contains(k) => None, + MaySchedule::Yes(utilization) => Some(( + *k, + context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE), + utilization.cached_score(), + v.attached_shard_count, + )), }) .collect(); + // Exclude nodes whose utilization is critically high, if there are alternatives available. This will + // cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example + // we may place shards in the same tenant together on the same pageserver if all other pageservers are + // overloaded. + let non_overloaded_scores = scores + .iter() + .filter(|i| !PageserverUtilization::is_overloaded(i.2)) + .copied() + .collect::>(); + if !non_overloaded_scores.is_empty() { + scores = non_overloaded_scores; + } + // Sort by, in order of precedence: // 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available - // 2nd: Attached shard count. Within nodes with the same affinity, we always pick the node with - // the least number of attached shards. - // 3rd: Total shard count. Within nodes with the same affinity and attached shard count, use nodes - // with the lower total shard count. + // 2nd: Utilization score (this combines shard count and disk utilization) + // 3rd: Attached shard count. When nodes have identical utilization (e.g. when populating some + // empty nodes), this acts as an anti-affinity between attached shards. // 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems. - scores.sort_by_key(|i| (i.1, i.3, i.2, i.0)); + scores.sort_by_key(|i| (i.1, i.2, i.3, i.0)); if scores.is_empty() { // After applying constraints, no pageservers were left. @@ -429,7 +472,7 @@ impl Scheduler { for (node_id, node) in &self.nodes { tracing::info!( "Node {node_id}: may_schedule={} shards={}", - node.may_schedule != MaySchedule::No, + !matches!(node.may_schedule, MaySchedule::No), node.shard_count ); } @@ -469,7 +512,7 @@ impl Scheduler { pub(crate) mod test_utils { use crate::node::Node; - use pageserver_api::controller_api::{NodeAvailability, UtilizationScore}; + use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; use std::collections::HashMap; use utils::id::NodeId; /// Test helper: synthesize the requested number of nodes, all in active state. @@ -486,7 +529,7 @@ pub(crate) mod test_utils { format!("pghost-{i}"), 5432 + i as u16, ); - node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0))); assert!(node.is_available()); node }) @@ -497,6 +540,8 @@ pub(crate) mod test_utils { #[cfg(test)] mod tests { + use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization}; + use super::*; use crate::tenant_shard::IntentState; @@ -557,4 +602,130 @@ mod tests { Ok(()) } + + #[test] + /// Test the PageserverUtilization's contribution to scheduling algorithm + fn scheduler_utilization() { + let mut nodes = test_utils::make_test_nodes(3); + let mut scheduler = Scheduler::new(nodes.values()); + + // Need to keep these alive because they contribute to shard counts via RAII + let mut scheduled_intents = Vec::new(); + + let empty_context = ScheduleContext::default(); + + fn assert_scheduler_chooses( + expect_node: NodeId, + scheduled_intents: &mut Vec, + scheduler: &mut Scheduler, + context: &ScheduleContext, + ) { + let scheduled = scheduler.schedule_shard(&[], context).unwrap(); + let mut intent = IntentState::new(); + intent.set_attached(scheduler, Some(scheduled)); + scheduled_intents.push(intent); + assert_eq!(scheduled, expect_node); + } + + // Independent schedule calls onto empty nodes should round-robin, because each node's + // utilization's shard count is updated inline. The order is determinsitic because when all other factors are + // equal, we order by node ID. + assert_scheduler_chooses( + NodeId(1), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + assert_scheduler_chooses( + NodeId(2), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + assert_scheduler_chooses( + NodeId(3), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + + // Manually setting utilization higher should cause schedule calls to round-robin the other nodes + // which have equal utilization. + nodes + .get_mut(&NodeId(1)) + .unwrap() + .set_availability(NodeAvailability::Active(test_utilization::simple( + 10, + 1024 * 1024 * 1024, + ))); + scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); + + assert_scheduler_chooses( + NodeId(2), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + assert_scheduler_chooses( + NodeId(3), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + assert_scheduler_chooses( + NodeId(2), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + assert_scheduler_chooses( + NodeId(3), + &mut scheduled_intents, + &mut scheduler, + &empty_context, + ); + + // The scheduler should prefer nodes with lower affinity score, + // even if they have higher utilization (as long as they aren't utilized at >100%) + let mut context_prefer_node1 = ScheduleContext::default(); + context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]); + assert_scheduler_chooses( + NodeId(1), + &mut scheduled_intents, + &mut scheduler, + &context_prefer_node1, + ); + assert_scheduler_chooses( + NodeId(1), + &mut scheduled_intents, + &mut scheduler, + &context_prefer_node1, + ); + + // If a node is over-utilized, it will not be used even if affinity scores prefer it + nodes + .get_mut(&NodeId(1)) + .unwrap() + .set_availability(NodeAvailability::Active(test_utilization::simple( + 20000, + 1024 * 1024 * 1024, + ))); + scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap()); + assert_scheduler_chooses( + NodeId(2), + &mut scheduled_intents, + &mut scheduler, + &context_prefer_node1, + ); + assert_scheduler_chooses( + NodeId(3), + &mut scheduled_intents, + &mut scheduler, + &context_prefer_node1, + ); + + for mut intent in scheduled_intents { + intent.clear(&mut scheduler); + } + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 453e96bad3..4b0c556824 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -44,7 +44,7 @@ use pageserver_api::{ NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest, - TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore, + TenantShardMigrateRequest, TenantShardMigrateResponse, }, models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest}, }; @@ -542,7 +542,7 @@ impl Service { let locked = self.inner.read().unwrap(); locked.nodes.clone() }; - let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await; + let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await; // List of tenants for which we will attempt to notify compute of their location at startup let mut compute_notifications = Vec::new(); @@ -556,10 +556,8 @@ impl Service { // Mark nodes online if they responded to us: nodes are offline by default after a restart. let mut new_nodes = (**nodes).clone(); for (node_id, node) in new_nodes.iter_mut() { - if let Some(utilization) = nodes_online.get(node_id) { - node.set_availability(NodeAvailability::Active(UtilizationScore( - utilization.utilization_score, - ))); + if let Some(utilization) = nodes_online.remove(node_id) { + node.set_availability(NodeAvailability::Active(utilization)); scheduler.node_upsert(node); } } @@ -925,9 +923,9 @@ impl Service { if let Ok(deltas) = res { for (node_id, state) in deltas.0 { let new_availability = match state { - PageserverState::Available { utilization, .. } => NodeAvailability::Active( - UtilizationScore(utilization.utilization_score), - ), + PageserverState::Available { utilization, .. } => { + NodeAvailability::Active(utilization) + } PageserverState::WarmingUp { started_at } => { NodeAvailability::WarmingUp(started_at) } @@ -936,14 +934,17 @@ impl Service { // while the heartbeat round was on-going. Hence, filter out // offline transitions for WarmingUp nodes that are still within // their grace period. - if let Ok(NodeAvailability::WarmingUp(started_at)) = - self.get_node(node_id).await.map(|n| n.get_availability()) + if let Ok(NodeAvailability::WarmingUp(started_at)) = self + .get_node(node_id) + .await + .as_ref() + .map(|n| n.get_availability()) { let now = Instant::now(); - if now - started_at >= self.config.max_warming_up_interval { + if now - *started_at >= self.config.max_warming_up_interval { NodeAvailability::Offline } else { - NodeAvailability::WarmingUp(started_at) + NodeAvailability::WarmingUp(*started_at) } } else { NodeAvailability::Offline @@ -1625,7 +1626,7 @@ impl Service { // This Node is a mutable local copy: we will set it active so that we can use its // API client to reconcile with the node. The Node in [`Self::nodes`] will get updated // later. - node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + node.set_availability(NodeAvailability::Active(PageserverUtilization::full())); let configs = match node .with_client_retries( @@ -2473,7 +2474,7 @@ impl Service { .await; let node = { - let locked = self.inner.read().unwrap(); + let mut locked = self.inner.write().unwrap(); // Just a sanity check to prevent misuse: the API expects that the tenant is fully // detached everywhere, and nothing writes to S3 storage. Here, we verify that, // but only at the start of the process, so it's really just to prevent operator @@ -2500,7 +2501,7 @@ impl Service { return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}"))); } } - let scheduler = &locked.scheduler; + let scheduler = &mut locked.scheduler; // Right now we only perform the operation on a single node without parallelization // TODO fan out the operation to multiple nodes for better performance let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?; @@ -4761,7 +4762,7 @@ impl Service { // // The transition we calculate here remains valid later in the function because we hold the op lock on the node: // nothing else can mutate its availability while we run. - let availability_transition = if let Some(input_availability) = availability { + let availability_transition = if let Some(input_availability) = availability.as_ref() { let (activate_node, availability_transition) = { let locked = self.inner.read().unwrap(); let Some(node) = locked.nodes.get(&node_id) else { @@ -4797,8 +4798,8 @@ impl Service { )); }; - if let Some(availability) = &availability { - node.set_availability(*availability); + if let Some(availability) = availability.as_ref() { + node.set_availability(availability.clone()); } if let Some(scheduling) = scheduling { diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 1fcc3c8547..30723a3b36 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -779,7 +779,7 @@ impl TenantShard { #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn optimize_secondary( &self, - scheduler: &Scheduler, + scheduler: &mut Scheduler, schedule_context: &ScheduleContext, ) -> Option { if self.intent.secondary.is_empty() { @@ -1595,7 +1595,7 @@ pub(crate) mod tests { schedule_context.avoid(&shard_b.intent.all_pageservers()); schedule_context.push_attached(shard_b.intent.get_attached().unwrap()); - let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context); + let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context); // Since there is a node with no locations available, the node with two locations for the // same tenant should generate an optimization to move one away diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 1011a6fd22..bfd82242e9 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -394,6 +394,7 @@ def test_sharding_split_smoke( # Note which pageservers initially hold a shard after tenant creation pre_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)] + log.info("Pre-split pageservers: {pre_split_pageserver_ids}") # For pageservers holding a shard, validate their ingest statistics # reflect a proper splitting of the WAL. @@ -555,9 +556,9 @@ def test_sharding_split_smoke( assert sum(total.values()) == split_shard_count * 2 check_effective_tenant_config() - # More specific check: that we are fully balanced. This is deterministic because - # the order in which we consider shards for optimization is deterministic, and the - # order of preference of nodes is also deterministic (lower node IDs win). + # More specific check: that we are fully balanced. It is deterministic that we will get exactly + # one shard on each pageserver, because for these small shards the utilization metric is + # dominated by shard count. log.info(f"total: {total}") assert total == { 1: 1, @@ -577,8 +578,14 @@ def test_sharding_split_smoke( 15: 1, 16: 1, } + + # The controller is not required to lay out the attached locations in any particular way, but + # all the pageservers that originally held an attached shard should still hold one, otherwise + # it would indicate that we had done some unnecessary migration. log.info(f"attached: {attached}") - assert attached == {1: 1, 2: 1, 3: 1, 5: 1, 6: 1, 7: 1, 9: 1, 11: 1} + for ps_id in pre_split_pageserver_ids: + log.info("Pre-split pageserver {ps_id} should still hold an attached location") + assert ps_id in attached # Ensure post-split pageserver locations survive a restart (i.e. the child shards # correctly wrote config to disk, and the storage controller responds correctly