storcon: add node fill algorithm

This commit is contained in:
Vlad Lazar
2024-06-11 15:00:36 +01:00
parent fcbac527b0
commit 58340f9dbf
2 changed files with 192 additions and 0 deletions

View File

@@ -1,4 +1,5 @@
use crate::{node::Node, tenant_shard::TenantShard};
use itertools::Itertools;
use pageserver_api::controller_api::UtilizationScore;
use serde::Serialize;
use std::collections::HashMap;
@@ -283,6 +284,44 @@ impl Scheduler {
}
}
// Check if the number of shards attached to a given node is lagging below
// the cluster average. If that's the case, the node should be filled.
pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
let Some(node) = self.nodes.get(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return 0;
};
assert!(!self.nodes.is_empty());
let expected_attached_shards_per_node = self.expected_attached_shard_count();
for (node_id, node) in self.nodes.iter() {
tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
}
if node.attached_shard_count < expected_attached_shards_per_node {
expected_attached_shards_per_node - node.attached_shard_count
} else {
0
}
}
pub(crate) fn expected_attached_shard_count(&self) -> usize {
let total_attached_shards: usize =
self.nodes.values().map(|n| n.attached_shard_count).sum();
assert!(!self.nodes.is_empty());
total_attached_shards / self.nodes.len()
}
pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
self.nodes
.iter()
.map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
.sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
.collect()
}
pub(crate) fn node_upsert(&mut self, node: &Node) {
use std::collections::hash_map::Entry::*;
match self.nodes.entry(node.get_id()) {

View File

@@ -5092,4 +5092,157 @@ impl Service {
Ok(())
}
pub(crate) async fn fill_node(
&self,
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
// TODO(vlad): Currently this operates on the assumption that all
// secondaries are warm. This is not always true (e.g. we just migrated the
// tenant). Take that into consideration by checking the secondary status.
tracing::info!(%node_id, "Starting fill background operation");
// Create a fill plan (pick secondaries to promote) that meets the following requirements:
// 1. The node should be filled until it reaches the expected cluster average of
// attached shards. If there are not enough secondaries on the node, the plan stops early.
// 2. Select tenant shards to promote such that the number of attached shards is balanced
// throughout the cluster. We achieve this by picking tenant shards from each node,
// starting from the ones with the largest number of attached shards, until the node
// reaches the expected cluster average.
let mut tids_to_promote = {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let mut tids_by_node = locked
.tenants
.iter_mut()
.filter_map(|(tid, tenant_shard)| {
if tenant_shard.intent.get_secondary().contains(&node_id) {
if let Some(primary) = tenant_shard.intent.get_attached() {
return Some((*primary, *tid));
}
}
None
})
.into_group_map();
let expected_attached = locked.scheduler.expected_attached_shard_count();
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
let mut plan = Vec::new();
for (node_id, attached) in nodes_by_load {
if plan.len() >= fill_requirement
|| tids_by_node.is_empty()
|| attached <= expected_attached
{
break;
}
let can_take = attached - expected_attached;
let mut remove_node = false;
for _ in 0..can_take {
match tids_by_node.get_mut(&node_id) {
Some(tids) => match tids.pop() {
Some(tid) => {
plan.push(tid);
}
None => {
remove_node = true;
break;
}
},
None => {
break;
}
}
}
if remove_node {
tids_by_node.remove(&node_id);
}
}
plan
};
let mut waiters = Vec::new();
let mut schedule_context = ScheduleContext::default();
// Execute the plan we've composed above. Before aplying each move from the plan,
// we validate to ensure that it has not gone stale in the meantime.
while !tids_to_promote.is_empty() {
if cancel.is_cancelled() {
return Err(OperationError::Cancelled);
}
{
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
format!("node {node_id} was removed").into(),
))?;
let current_policy = node.get_scheduling();
if !matches!(current_policy, NodeSchedulingPolicy::Filling) {
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
// about it
return Err(OperationError::NodeStateChanged(
format!("node {node_id} changed state to {current_policy:?}").into(),
));
}
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
if let Some(tid) = tids_to_promote.pop() {
if let Some(tenant_shard) = tenants.get_mut(&tid) {
// If the node being filled is not a secondary anymore,
// skip the promotion.
if !tenant_shard.intent.get_secondary().contains(&node_id) {
continue;
}
tenant_shard.intent.promote_attached(scheduler, node_id);
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
tracing::warn!(%tid, "Scheduling error when filling pageserver {} : {e}", node_id);
}
Ok(()) => {
if let Some(waiter) =
self.maybe_reconcile_shard(tenant_shard, nodes)
{
waiters.push(waiter);
}
}
}
}
} else {
break;
}
}
}
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
}
while !waiters.is_empty() {
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
}
if let Err(err) = self
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
.await
{
// This isn't a huge issue since the filling process starts upon request. However, it
// will prevent the next drain from starting. The only case in which this can fail
// is database unavailability. Such a case will require manual intervention.
tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}");
}
tracing::info!(%node_id, "Completed fill background operation");
Ok(())
}
}