review: lift fill plan to a separate function

This commit is contained in:
Vlad Lazar
2024-06-17 12:26:08 +01:00
parent 2a7f224306
commit e200a2b01e

View File

@@ -5209,11 +5209,15 @@ impl Service {
}
}
waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await;
waiters = self
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
}
while !waiters.is_empty() {
waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await;
waiters = self
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
}
// At this point we have done the best we could to drain shards from this node.
@@ -5235,6 +5239,70 @@ impl Service {
Ok(())
}
/// Create a node fill plan (pick secondaries to promote) that meets the following requirements:
/// 1. The node should be filled until it reaches the expected cluster average of
/// attached shards. If there are not enough secondaries on the node, the plan stops early.
/// 2. Select tenant shards to promote such that the number of attached shards is balanced
/// throughout the cluster. We achieve this by picking tenant shards from each node,
/// starting from the ones with the largest number of attached shards, until the node
/// reaches the expected cluster average.
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let mut tids_by_node = locked
.tenants
.iter_mut()
.filter_map(|(tid, tenant_shard)| {
if tenant_shard.intent.get_secondary().contains(&node_id) {
if let Some(primary) = tenant_shard.intent.get_attached() {
return Some((*primary, *tid));
}
}
None
})
.into_group_map();
let expected_attached = locked.scheduler.expected_attached_shard_count();
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
let mut plan = Vec::new();
for (node_id, attached) in nodes_by_load {
if plan.len() >= fill_requirement
|| tids_by_node.is_empty()
|| attached <= expected_attached
{
break;
}
let can_take = attached - expected_attached;
let mut remove_node = false;
for _ in 0..can_take {
match tids_by_node.get_mut(&node_id) {
Some(tids) => match tids.pop() {
Some(tid) => {
plan.push(tid);
}
None => {
remove_node = true;
break;
}
},
None => {
break;
}
}
}
if remove_node {
tids_by_node.remove(&node_id);
}
}
plan
}
pub(crate) async fn fill_node(
&self,
node_id: NodeId,
@@ -5246,69 +5314,7 @@ impl Service {
tracing::info!(%node_id, "Starting fill background operation");
// Create a fill plan (pick secondaries to promote) that meets the following requirements:
// 1. The node should be filled until it reaches the expected cluster average of
// attached shards. If there are not enough secondaries on the node, the plan stops early.
// 2. Select tenant shards to promote such that the number of attached shards is balanced
// throughout the cluster. We achieve this by picking tenant shards from each node,
// starting from the ones with the largest number of attached shards, until the node
// reaches the expected cluster average.
let mut tids_to_promote = {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let mut tids_by_node = locked
.tenants
.iter_mut()
.filter_map(|(tid, tenant_shard)| {
if tenant_shard.intent.get_secondary().contains(&node_id) {
if let Some(primary) = tenant_shard.intent.get_attached() {
return Some((*primary, *tid));
}
}
None
})
.into_group_map();
let expected_attached = locked.scheduler.expected_attached_shard_count();
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
let mut plan = Vec::new();
for (node_id, attached) in nodes_by_load {
if plan.len() >= fill_requirement
|| tids_by_node.is_empty()
|| attached <= expected_attached
{
break;
}
let can_take = attached - expected_attached;
let mut remove_node = false;
for _ in 0..can_take {
match tids_by_node.get_mut(&node_id) {
Some(tids) => match tids.pop() {
Some(tid) => {
plan.push(tid);
}
None => {
remove_node = true;
break;
}
},
None => {
break;
}
}
}
if remove_node {
tids_by_node.remove(&node_id);
}
}
plan
};
let mut tids_to_promote = self.fill_node_plan(node_id);
let mut waiters = Vec::new();
let mut schedule_context = ScheduleContext::default();
@@ -5366,11 +5372,15 @@ impl Service {
}
}
waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await;
waiters = self
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
}
while !waiters.is_empty() {
waiters = self.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT).await;
waiters = self
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
}
if let Err(err) = self