diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0ff7c10de9..b29c985a9e 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -5209,11 +5209,15 @@ impl Service { } } - waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await; + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; } while !waiters.is_empty() { - waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await; + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; } // At this point we have done the best we could to drain shards from this node. @@ -5235,6 +5239,70 @@ impl Service { Ok(()) } + /// Create a node fill plan (pick secondaries to promote) that meets the following requirements: + /// 1. The node should be filled until it reaches the expected cluster average of + /// attached shards. If there are not enough secondaries on the node, the plan stops early. + /// 2. Select tenant shards to promote such that the number of attached shards is balanced + /// throughout the cluster. We achieve this by picking tenant shards from each node, + /// starting from the ones with the largest number of attached shards, until the node + /// reaches the expected cluster average. + fn fill_node_plan(&self, node_id: NodeId) -> Vec { + let mut locked = self.inner.write().unwrap(); + let fill_requirement = locked.scheduler.compute_fill_requirement(node_id); + + let mut tids_by_node = locked + .tenants + .iter_mut() + .filter_map(|(tid, tenant_shard)| { + if tenant_shard.intent.get_secondary().contains(&node_id) { + if let Some(primary) = tenant_shard.intent.get_attached() { + return Some((*primary, *tid)); + } + } + + None + }) + .into_group_map(); + + let expected_attached = locked.scheduler.expected_attached_shard_count(); + let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count(); + + let mut plan = Vec::new(); + for (node_id, attached) in nodes_by_load { + if plan.len() >= fill_requirement + || tids_by_node.is_empty() + || attached <= expected_attached + { + break; + } + + let can_take = attached - expected_attached; + let mut remove_node = false; + for _ in 0..can_take { + match tids_by_node.get_mut(&node_id) { + Some(tids) => match tids.pop() { + Some(tid) => { + plan.push(tid); + } + None => { + remove_node = true; + break; + } + }, + None => { + break; + } + } + } + + if remove_node { + tids_by_node.remove(&node_id); + } + } + + plan + } + pub(crate) async fn fill_node( &self, node_id: NodeId, @@ -5246,69 +5314,7 @@ impl Service { tracing::info!(%node_id, "Starting fill background operation"); - // Create a fill plan (pick secondaries to promote) that meets the following requirements: - // 1. The node should be filled until it reaches the expected cluster average of - // attached shards. If there are not enough secondaries on the node, the plan stops early. - // 2. Select tenant shards to promote such that the number of attached shards is balanced - // throughout the cluster. We achieve this by picking tenant shards from each node, - // starting from the ones with the largest number of attached shards, until the node - // reaches the expected cluster average. - let mut tids_to_promote = { - let mut locked = self.inner.write().unwrap(); - let fill_requirement = locked.scheduler.compute_fill_requirement(node_id); - - let mut tids_by_node = locked - .tenants - .iter_mut() - .filter_map(|(tid, tenant_shard)| { - if tenant_shard.intent.get_secondary().contains(&node_id) { - if let Some(primary) = tenant_shard.intent.get_attached() { - return Some((*primary, *tid)); - } - } - - None - }) - .into_group_map(); - - let expected_attached = locked.scheduler.expected_attached_shard_count(); - let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count(); - - let mut plan = Vec::new(); - for (node_id, attached) in nodes_by_load { - if plan.len() >= fill_requirement - || tids_by_node.is_empty() - || attached <= expected_attached - { - break; - } - - let can_take = attached - expected_attached; - let mut remove_node = false; - for _ in 0..can_take { - match tids_by_node.get_mut(&node_id) { - Some(tids) => match tids.pop() { - Some(tid) => { - plan.push(tid); - } - None => { - remove_node = true; - break; - } - }, - None => { - break; - } - } - } - - if remove_node { - tids_by_node.remove(&node_id); - } - } - - plan - }; + let mut tids_to_promote = self.fill_node_plan(node_id); let mut waiters = Vec::new(); let mut schedule_context = ScheduleContext::default(); @@ -5366,11 +5372,15 @@ impl Service { } } - waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await; + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; } while !waiters.is_empty() { - waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await; + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; } if let Err(err) = self