diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index 7e4030b221..4639aa5e78 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -438,6 +438,24 @@ async fn handle_tenants_dump(req: Request
) -> Result, ApiEr
state.service.tenants_dump()
}
+async fn handle_balance_all(
+ service: Arc,
+ req: Request,
+) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+ service.balance_all()?;
+ json_response(StatusCode::OK, ())
+}
+
+async fn handle_balance_attached(
+ service: Arc,
+ req: Request,
+) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+ service.balance_attached()?;
+ json_response(StatusCode::OK, ())
+}
+
async fn handle_scheduler_dump(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -572,6 +590,12 @@ pub fn make_router(
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
tenant_service_handler(r, handle_tenant_shard_split)
})
+ .post("/control/v1/balance/all", |r| {
+ tenant_service_handler(r, handle_balance_all)
+ })
+ .post("/control/v1/balance/attached", |r| {
+ tenant_service_handler(r, handle_balance_attached)
+ })
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index 024e4058aa..259551d144 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -2931,6 +2931,154 @@ impl Service {
Ok(())
}
+ /// Switch shards between primary and secondary locations if it improves balance
+ /// of attachments across nodes. This is useful after e.g. a node goes offline
+ /// and returns, since its attachments will have moved away.
+ pub(crate) fn balance_attached(&self) -> Result<(), ApiError> {
+ let mut attachment_counts: HashMap = HashMap::new();
+ {
+ let locked = self.inner.read().unwrap();
+ for shard in locked.tenants.values() {
+ if let Some(node_id) = shard.intent.get_attached() {
+ let entry = attachment_counts.entry(*node_id).or_insert(0);
+ *entry += 1;
+ }
+ }
+
+ // Exclude un-schedulable nodes from our set of those we consider overloaded: we
+ // will aim to balance the expect average shards/node among those elegible for scheduling.
+ for (node_id, node) in locked.nodes.iter() {
+ if !node.may_schedule() {
+ attachment_counts.remove(node_id);
+ }
+ }
+ }
+
+ let avg: usize = attachment_counts.values().sum::() / attachment_counts.len();
+
+ // For any shards that are attached to a node which is more over-loaded than
+ // their secondary location, switch to the secondary location
+ {
+ let mut locked = self.inner.write().unwrap();
+ let result_tx = locked.result_tx.clone();
+ let compute_hook = locked.compute_hook.clone();
+ let (nodes, tenants, scheduler) = locked.parts_mut();
+
+ for shard in tenants.values_mut() {
+ if let (Some(primary), Some(secondary)) = (
+ *shard.intent.get_attached(),
+ shard.intent.get_secondary().iter().next().copied(),
+ ) {
+ if let (Some(primary_count), Some(secondary_count)) = (
+ attachment_counts.get(&primary),
+ attachment_counts.get(&secondary),
+ ) {
+ if *primary_count > avg + 1 && primary_count > secondary_count {
+ shard.intent.promote_attached(scheduler, secondary);
+ shard.maybe_reconcile(
+ result_tx.clone(),
+ nodes,
+ &compute_hook,
+ &self.config,
+ &self.persistence,
+ &self.gate,
+ &self.cancel,
+ );
+
+ *attachment_counts.get_mut(&primary).unwrap() -= 1;
+ *attachment_counts.get_mut(&secondary).unwrap() -= 1;
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(crate) fn balance_all(&self) -> Result<(), ApiError> {
+ // Our to-do list is a map of nodes to how many excess locations they have. We will
+ // improve balance by de-scheduling secondary locations from nodes that have too many
+ // locations. When we latter call schedule() on these shards, they will implicitly
+ // be assigned to locations that are under-loaded.
+ let mut overloads: HashMap = HashMap::new();
+ {
+ let locked = self.inner.read().unwrap();
+ for shard in locked.tenants.values() {
+ for node_id in shard.intent.all_pageservers() {
+ let entry = overloads.entry(node_id).or_insert(0);
+ *entry += 1;
+ }
+ }
+
+ // Exclude un-schedulable nodes from our set of those we consider overloaded: we
+ // will aim to balance the expect average shards/node among those elegible for scheduling.
+ for (node_id, node) in locked.nodes.iter() {
+ if !node.may_schedule() {
+ overloads.remove(node_id);
+ }
+ }
+ }
+
+ let avg: usize = overloads.values().sum::() / overloads.len();
+ for count in overloads.values_mut() {
+ *count = count.saturating_sub(avg - 1)
+ }
+
+ // De-schedule from overloaded nodes
+ {
+ let mut locked = self.inner.write().unwrap();
+ let (_nodes, tenants, scheduler) = locked.parts_mut();
+ for shard in tenants.values_mut() {
+ // We will only consider one secondary location per shard, or the attached location
+ // if it has no secondaries.
+ if let Some(candidate) = shard.intent.get_secondary().iter().next() {
+ if let Some(node_count) = overloads.get_mut(candidate) {
+ if *node_count > 0 {
+ *node_count -= 1;
+ shard.intent.remove_secondary(scheduler, *candidate);
+ }
+ }
+ } else if let Some(candidate) = shard.intent.get_attached() {
+ if let Some(node_count) = overloads.get_mut(candidate) {
+ if *node_count > 0 {
+ *node_count -= 1;
+ shard.intent.set_attached(scheduler, None);
+ }
+ }
+ }
+ }
+ }
+
+ // Call schedule on all shards to allocate new locations on under-loaded nodes
+ {
+ let mut locked = self.inner.write().unwrap();
+ let result_tx = locked.result_tx.clone();
+ let compute_hook = locked.compute_hook.clone();
+ let (nodes, tenants, scheduler) = locked.parts_mut();
+ for (shard_id, shard) in tenants {
+ if let Err(e) = shard.schedule(scheduler) {
+ tracing::warn!("Could not schedule {shard_id}: {e}");
+ // If we couldn't schedule one shard, we probably won't be able to schedule
+ // the rest: log warning once and drop out.
+ break;
+ }
+
+ shard.maybe_reconcile(
+ result_tx.clone(),
+ nodes,
+ &compute_hook,
+ &self.config,
+ &self.persistence,
+ &self.gate,
+ &self.cancel,
+ );
+ }
+ }
+
+ Ok(())
+ }
+
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.