From 27815678e76f8fbce52c3b01852d442e208b4bd1 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 4 Mar 2024 15:01:22 +0000 Subject: [PATCH] control_plane: basic balancer API --- control_plane/attachment_service/src/http.rs | 24 +++ .../attachment_service/src/service.rs | 148 ++++++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 7e4030b221..4639aa5e78 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -438,6 +438,24 @@ async fn handle_tenants_dump(req: Request) -> Result, ApiEr state.service.tenants_dump() } +async fn handle_balance_all( + service: Arc, + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + service.balance_all()?; + json_response(StatusCode::OK, ()) +} + +async fn handle_balance_attached( + service: Arc, + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + service.balance_attached()?; + json_response(StatusCode::OK, ()) +} + async fn handle_scheduler_dump(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -572,6 +590,12 @@ pub fn make_router( .put("/control/v1/tenant/:tenant_id/shard_split", |r| { tenant_service_handler(r, handle_tenant_shard_split) }) + .post("/control/v1/balance/all", |r| { + tenant_service_handler(r, handle_balance_all) + }) + .post("/control/v1/balance/attached", |r| { + tenant_service_handler(r, handle_balance_attached) + }) // Tenant operations // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity. diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 024e4058aa..259551d144 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -2931,6 +2931,154 @@ impl Service { Ok(()) } + /// Switch shards between primary and secondary locations if it improves balance + /// of attachments across nodes. This is useful after e.g. a node goes offline + /// and returns, since its attachments will have moved away. + pub(crate) fn balance_attached(&self) -> Result<(), ApiError> { + let mut attachment_counts: HashMap = HashMap::new(); + { + let locked = self.inner.read().unwrap(); + for shard in locked.tenants.values() { + if let Some(node_id) = shard.intent.get_attached() { + let entry = attachment_counts.entry(*node_id).or_insert(0); + *entry += 1; + } + } + + // Exclude un-schedulable nodes from our set of those we consider overloaded: we + // will aim to balance the expect average shards/node among those elegible for scheduling. + for (node_id, node) in locked.nodes.iter() { + if !node.may_schedule() { + attachment_counts.remove(node_id); + } + } + } + + let avg: usize = attachment_counts.values().sum::() / attachment_counts.len(); + + // For any shards that are attached to a node which is more over-loaded than + // their secondary location, switch to the secondary location + { + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + for shard in tenants.values_mut() { + if let (Some(primary), Some(secondary)) = ( + *shard.intent.get_attached(), + shard.intent.get_secondary().iter().next().copied(), + ) { + if let (Some(primary_count), Some(secondary_count)) = ( + attachment_counts.get(&primary), + attachment_counts.get(&secondary), + ) { + if *primary_count > avg + 1 && primary_count > secondary_count { + shard.intent.promote_attached(scheduler, secondary); + shard.maybe_reconcile( + result_tx.clone(), + nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ); + + *attachment_counts.get_mut(&primary).unwrap() -= 1; + *attachment_counts.get_mut(&secondary).unwrap() -= 1; + } + } + } + } + } + + Ok(()) + } + + pub(crate) fn balance_all(&self) -> Result<(), ApiError> { + // Our to-do list is a map of nodes to how many excess locations they have. We will + // improve balance by de-scheduling secondary locations from nodes that have too many + // locations. When we latter call schedule() on these shards, they will implicitly + // be assigned to locations that are under-loaded. + let mut overloads: HashMap = HashMap::new(); + { + let locked = self.inner.read().unwrap(); + for shard in locked.tenants.values() { + for node_id in shard.intent.all_pageservers() { + let entry = overloads.entry(node_id).or_insert(0); + *entry += 1; + } + } + + // Exclude un-schedulable nodes from our set of those we consider overloaded: we + // will aim to balance the expect average shards/node among those elegible for scheduling. + for (node_id, node) in locked.nodes.iter() { + if !node.may_schedule() { + overloads.remove(node_id); + } + } + } + + let avg: usize = overloads.values().sum::() / overloads.len(); + for count in overloads.values_mut() { + *count = count.saturating_sub(avg - 1) + } + + // De-schedule from overloaded nodes + { + let mut locked = self.inner.write().unwrap(); + let (_nodes, tenants, scheduler) = locked.parts_mut(); + for shard in tenants.values_mut() { + // We will only consider one secondary location per shard, or the attached location + // if it has no secondaries. + if let Some(candidate) = shard.intent.get_secondary().iter().next() { + if let Some(node_count) = overloads.get_mut(candidate) { + if *node_count > 0 { + *node_count -= 1; + shard.intent.remove_secondary(scheduler, *candidate); + } + } + } else if let Some(candidate) = shard.intent.get_attached() { + if let Some(node_count) = overloads.get_mut(candidate) { + if *node_count > 0 { + *node_count -= 1; + shard.intent.set_attached(scheduler, None); + } + } + } + } + } + + // Call schedule on all shards to allocate new locations on under-loaded nodes + { + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + for (shard_id, shard) in tenants { + if let Err(e) = shard.schedule(scheduler) { + tracing::warn!("Could not schedule {shard_id}: {e}"); + // If we couldn't schedule one shard, we probably won't be able to schedule + // the rest: log warning once and drop out. + break; + } + + shard.maybe_reconcile( + result_tx.clone(), + nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ); + } + } + + Ok(()) + } + /// Helper for methods that will try and call pageserver APIs for /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant /// is attached somewhere.