controller: use PageserverUtilization for scheduling (#8711)

## Problem

Previously, the controller only used the shard counts for scheduling.
This works well when hosting only many-sharded tenants, but works much
less well when hosting single-sharded tenants that have a greater
deviation in size-per-shard.

Closes: https://github.com/neondatabase/neon/issues/7798

## Summary of changes

- Instead of UtilizationScore, carry the full PageserverUtilization
through into the Scheduler.
- Use the PageserverUtilization::score() instead of shard count when
ordering nodes in scheduling.

Q: Why did test_sharding_split_smoke need updating in this PR?
A: There's an interesting side effect during shard splits: because we do
not decrement the shard count in the utilization when we de-schedule the
shards from before the split, the controller will now prefer to pick
_different_ nodes for shards compared with which ones held secondaries
before the split. We could use our knowledge of splitting to fix up the
utilizations more actively in this situation, but I'm leaning toward
leaving the code simpler, as in practical systems the impact of one
shard on the utilization of a node should be fairly low (single digit
%).
This commit is contained in:
John Spray
2024-08-23 18:32:56 +01:00
committed by GitHub
parent c1cb7a0fa0
commit b65a95f12e
11 changed files with 340 additions and 101 deletions

View File

@@ -6,10 +6,7 @@ use std::{
};
use tokio_util::sync::CancellationToken;
use pageserver_api::{
controller_api::{NodeAvailability, UtilizationScore},
models::PageserverUtilization,
};
use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
use thiserror::Error;
use utils::id::NodeId;
@@ -147,7 +144,8 @@ impl HeartbeaterTask {
// goes through to the pageserver even when the node is marked offline.
// This doesn't impact the availability observed by [`crate::service::Service`].
let mut node_clone = node.clone();
node_clone.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
node_clone
.set_availability(NodeAvailability::Active(PageserverUtilization::full()));
async move {
let response = node_clone
@@ -179,7 +177,7 @@ impl HeartbeaterTask {
node.get_availability()
{
PageserverState::WarmingUp {
started_at: last_seen_at,
started_at: *last_seen_at,
}
} else {
PageserverState::Offline

View File

@@ -92,15 +92,15 @@ impl Node {
}
}
pub(crate) fn get_availability(&self) -> NodeAvailability {
self.availability
pub(crate) fn get_availability(&self) -> &NodeAvailability {
&self.availability
}
pub(crate) fn set_availability(&mut self, availability: NodeAvailability) {
use AvailabilityTransition::*;
use NodeAvailability::WarmingUp;
match self.get_availability_transition(availability) {
match self.get_availability_transition(&availability) {
ToActive => {
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
// users of previously-cloned copies of the node will still see the old cancellation
@@ -115,8 +115,8 @@ impl Node {
Unchanged | ToWarmingUpFromOffline => {}
}
if let (WarmingUp(crnt), WarmingUp(proposed)) = (self.availability, availability) {
self.availability = WarmingUp(std::cmp::max(crnt, proposed));
if let (WarmingUp(crnt), WarmingUp(proposed)) = (&self.availability, &availability) {
self.availability = WarmingUp(std::cmp::max(*crnt, *proposed));
} else {
self.availability = availability;
}
@@ -126,12 +126,12 @@ impl Node {
/// into a description of the transition.
pub(crate) fn get_availability_transition(
&self,
availability: NodeAvailability,
availability: &NodeAvailability,
) -> AvailabilityTransition {
use AvailabilityTransition::*;
use NodeAvailability::*;
match (self.availability, availability) {
match (&self.availability, availability) {
(Offline, Active(_)) => ToActive,
(Active(_), Offline) => ToOffline,
(Active(_), WarmingUp(_)) => ToWarmingUpFromActive,
@@ -153,15 +153,15 @@ impl Node {
/// Is this node elegible to have work scheduled onto it?
pub(crate) fn may_schedule(&self) -> MaySchedule {
let score = match self.availability {
NodeAvailability::Active(score) => score,
let utilization = match &self.availability {
NodeAvailability::Active(u) => u.clone(),
NodeAvailability::Offline | NodeAvailability::WarmingUp(_) => return MaySchedule::No,
};
match self.scheduling {
NodeSchedulingPolicy::Active => MaySchedule::Yes(score),
NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
NodeSchedulingPolicy::Draining => MaySchedule::No,
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
NodeSchedulingPolicy::Pause => MaySchedule::No,
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
}
@@ -285,7 +285,7 @@ impl Node {
pub(crate) fn describe(&self) -> NodeDescribeResponse {
NodeDescribeResponse {
id: self.id,
availability: self.availability.into(),
availability: self.availability.clone().into(),
scheduling: self.scheduling,
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port,

View File

@@ -1,6 +1,6 @@
use crate::{node::Node, tenant_shard::TenantShard};
use itertools::Itertools;
use pageserver_api::controller_api::UtilizationScore;
use pageserver_api::models::PageserverUtilization;
use serde::Serialize;
use std::collections::HashMap;
use utils::{http::error::ApiError, id::NodeId};
@@ -20,9 +20,9 @@ impl From<ScheduleError> for ApiError {
}
}
#[derive(Serialize, Eq, PartialEq)]
#[derive(Serialize)]
pub enum MaySchedule {
Yes(UtilizationScore),
Yes(PageserverUtilization),
No,
}
@@ -282,6 +282,28 @@ impl Scheduler {
node.shard_count -= 1;
}
}
// Maybe update PageserverUtilization
match update {
RefCountUpdate::AddSecondary | RefCountUpdate::Attach => {
// Referencing the node: if this takes our shard_count above the utilzation structure's
// shard count, then artifically bump it: this ensures that the scheduler immediately
// recognizes that this node has more work on it, without waiting for the next heartbeat
// to update the utilization.
if let MaySchedule::Yes(utilization) = &mut node.may_schedule {
utilization.adjust_shard_count_max(node.shard_count as u32);
}
}
RefCountUpdate::PromoteSecondary
| RefCountUpdate::Detach
| RefCountUpdate::RemoveSecondary
| RefCountUpdate::DemoteAttached => {
// De-referencing the node: leave the utilization's shard_count at a stale higher
// value until some future heartbeat after we have physically removed this shard
// from the node: this prevents the scheduler over-optimistically trying to schedule
// more work onto the node before earlier detaches are done.
}
}
}
// Check if the number of shards attached to a given node is lagging below
@@ -326,7 +348,18 @@ impl Scheduler {
use std::collections::hash_map::Entry::*;
match self.nodes.entry(node.get_id()) {
Occupied(mut entry) => {
entry.get_mut().may_schedule = node.may_schedule();
// Updates to MaySchedule are how we receive updated PageserverUtilization: adjust these values
// to account for any shards scheduled on the controller but not yet visible to the pageserver.
let mut may_schedule = node.may_schedule();
match &mut may_schedule {
MaySchedule::Yes(utilization) => {
utilization.adjust_shard_count_max(entry.get().shard_count as u32);
}
MaySchedule::No => { // Nothing to tweak
}
}
entry.get_mut().may_schedule = may_schedule;
}
Vacant(entry) => {
entry.insert(SchedulerNode {
@@ -363,7 +396,7 @@ impl Scheduler {
let may_schedule = self
.nodes
.get(node_id)
.map(|n| n.may_schedule != MaySchedule::No)
.map(|n| !matches!(n.may_schedule, MaySchedule::No))
.unwrap_or(false);
(*node_id, may_schedule)
})
@@ -383,7 +416,7 @@ impl Scheduler {
/// the same tenant on the same node. This is a soft constraint: the context will never
/// cause us to fail to schedule a shard.
pub(crate) fn schedule_shard(
&self,
&mut self,
hard_exclude: &[NodeId],
context: &ScheduleContext,
) -> Result<NodeId, ScheduleError> {
@@ -391,31 +424,41 @@ impl Scheduler {
return Err(ScheduleError::NoPageservers);
}
let mut scores: Vec<(NodeId, AffinityScore, usize, usize)> = self
let mut scores: Vec<(NodeId, AffinityScore, u64, usize)> = self
.nodes
.iter()
.filter_map(|(k, v)| {
if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
None
} else {
Some((
*k,
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
v.shard_count,
v.attached_shard_count,
))
}
.iter_mut()
.filter_map(|(k, v)| match &mut v.may_schedule {
MaySchedule::No => None,
MaySchedule::Yes(_) if hard_exclude.contains(k) => None,
MaySchedule::Yes(utilization) => Some((
*k,
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
utilization.cached_score(),
v.attached_shard_count,
)),
})
.collect();
// Exclude nodes whose utilization is critically high, if there are alternatives available. This will
// cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
// we may place shards in the same tenant together on the same pageserver if all other pageservers are
// overloaded.
let non_overloaded_scores = scores
.iter()
.filter(|i| !PageserverUtilization::is_overloaded(i.2))
.copied()
.collect::<Vec<_>>();
if !non_overloaded_scores.is_empty() {
scores = non_overloaded_scores;
}
// Sort by, in order of precedence:
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
// 2nd: Attached shard count. Within nodes with the same affinity, we always pick the node with
// the least number of attached shards.
// 3rd: Total shard count. Within nodes with the same affinity and attached shard count, use nodes
// with the lower total shard count.
// 2nd: Utilization score (this combines shard count and disk utilization)
// 3rd: Attached shard count. When nodes have identical utilization (e.g. when populating some
// empty nodes), this acts as an anti-affinity between attached shards.
// 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
scores.sort_by_key(|i| (i.1, i.3, i.2, i.0));
scores.sort_by_key(|i| (i.1, i.2, i.3, i.0));
if scores.is_empty() {
// After applying constraints, no pageservers were left.
@@ -429,7 +472,7 @@ impl Scheduler {
for (node_id, node) in &self.nodes {
tracing::info!(
"Node {node_id}: may_schedule={} shards={}",
node.may_schedule != MaySchedule::No,
!matches!(node.may_schedule, MaySchedule::No),
node.shard_count
);
}
@@ -469,7 +512,7 @@ impl Scheduler {
pub(crate) mod test_utils {
use crate::node::Node;
use pageserver_api::controller_api::{NodeAvailability, UtilizationScore};
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
@@ -486,7 +529,7 @@ pub(crate) mod test_utils {
format!("pghost-{i}"),
5432 + i as u16,
);
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
assert!(node.is_available());
node
})
@@ -497,6 +540,8 @@ pub(crate) mod test_utils {
#[cfg(test)]
mod tests {
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
use super::*;
use crate::tenant_shard::IntentState;
@@ -557,4 +602,130 @@ mod tests {
Ok(())
}
#[test]
/// Test the PageserverUtilization's contribution to scheduling algorithm
fn scheduler_utilization() {
let mut nodes = test_utils::make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
// Need to keep these alive because they contribute to shard counts via RAII
let mut scheduled_intents = Vec::new();
let empty_context = ScheduleContext::default();
fn assert_scheduler_chooses(
expect_node: NodeId,
scheduled_intents: &mut Vec<IntentState>,
scheduler: &mut Scheduler,
context: &ScheduleContext,
) {
let scheduled = scheduler.schedule_shard(&[], context).unwrap();
let mut intent = IntentState::new();
intent.set_attached(scheduler, Some(scheduled));
scheduled_intents.push(intent);
assert_eq!(scheduled, expect_node);
}
// Independent schedule calls onto empty nodes should round-robin, because each node's
// utilization's shard count is updated inline. The order is determinsitic because when all other factors are
// equal, we order by node ID.
assert_scheduler_chooses(
NodeId(1),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
assert_scheduler_chooses(
NodeId(2),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
assert_scheduler_chooses(
NodeId(3),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
// Manually setting utilization higher should cause schedule calls to round-robin the other nodes
// which have equal utilization.
nodes
.get_mut(&NodeId(1))
.unwrap()
.set_availability(NodeAvailability::Active(test_utilization::simple(
10,
1024 * 1024 * 1024,
)));
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
assert_scheduler_chooses(
NodeId(2),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
assert_scheduler_chooses(
NodeId(3),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
assert_scheduler_chooses(
NodeId(2),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
assert_scheduler_chooses(
NodeId(3),
&mut scheduled_intents,
&mut scheduler,
&empty_context,
);
// The scheduler should prefer nodes with lower affinity score,
// even if they have higher utilization (as long as they aren't utilized at >100%)
let mut context_prefer_node1 = ScheduleContext::default();
context_prefer_node1.avoid(&[NodeId(2), NodeId(3)]);
assert_scheduler_chooses(
NodeId(1),
&mut scheduled_intents,
&mut scheduler,
&context_prefer_node1,
);
assert_scheduler_chooses(
NodeId(1),
&mut scheduled_intents,
&mut scheduler,
&context_prefer_node1,
);
// If a node is over-utilized, it will not be used even if affinity scores prefer it
nodes
.get_mut(&NodeId(1))
.unwrap()
.set_availability(NodeAvailability::Active(test_utilization::simple(
20000,
1024 * 1024 * 1024,
)));
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
assert_scheduler_chooses(
NodeId(2),
&mut scheduled_intents,
&mut scheduler,
&context_prefer_node1,
);
assert_scheduler_chooses(
NodeId(3),
&mut scheduled_intents,
&mut scheduler,
&context_prefer_node1,
);
for mut intent in scheduled_intents {
intent.clear(&mut scheduler);
}
}
}

View File

@@ -44,7 +44,7 @@ use pageserver_api::{
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
};
@@ -542,7 +542,7 @@ impl Service {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
let mut nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
@@ -556,10 +556,8 @@ impl Service {
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
let mut new_nodes = (**nodes).clone();
for (node_id, node) in new_nodes.iter_mut() {
if let Some(utilization) = nodes_online.get(node_id) {
node.set_availability(NodeAvailability::Active(UtilizationScore(
utilization.utilization_score,
)));
if let Some(utilization) = nodes_online.remove(node_id) {
node.set_availability(NodeAvailability::Active(utilization));
scheduler.node_upsert(node);
}
}
@@ -925,9 +923,9 @@ impl Service {
if let Ok(deltas) = res {
for (node_id, state) in deltas.0 {
let new_availability = match state {
PageserverState::Available { utilization, .. } => NodeAvailability::Active(
UtilizationScore(utilization.utilization_score),
),
PageserverState::Available { utilization, .. } => {
NodeAvailability::Active(utilization)
}
PageserverState::WarmingUp { started_at } => {
NodeAvailability::WarmingUp(started_at)
}
@@ -936,14 +934,17 @@ impl Service {
// while the heartbeat round was on-going. Hence, filter out
// offline transitions for WarmingUp nodes that are still within
// their grace period.
if let Ok(NodeAvailability::WarmingUp(started_at)) =
self.get_node(node_id).await.map(|n| n.get_availability())
if let Ok(NodeAvailability::WarmingUp(started_at)) = self
.get_node(node_id)
.await
.as_ref()
.map(|n| n.get_availability())
{
let now = Instant::now();
if now - started_at >= self.config.max_warming_up_interval {
if now - *started_at >= self.config.max_warming_up_interval {
NodeAvailability::Offline
} else {
NodeAvailability::WarmingUp(started_at)
NodeAvailability::WarmingUp(*started_at)
}
} else {
NodeAvailability::Offline
@@ -1625,7 +1626,7 @@ impl Service {
// This Node is a mutable local copy: we will set it active so that we can use its
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
// later.
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
node.set_availability(NodeAvailability::Active(PageserverUtilization::full()));
let configs = match node
.with_client_retries(
@@ -2473,7 +2474,7 @@ impl Service {
.await;
let node = {
let locked = self.inner.read().unwrap();
let mut locked = self.inner.write().unwrap();
// Just a sanity check to prevent misuse: the API expects that the tenant is fully
// detached everywhere, and nothing writes to S3 storage. Here, we verify that,
// but only at the start of the process, so it's really just to prevent operator
@@ -2500,7 +2501,7 @@ impl Service {
return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
}
}
let scheduler = &locked.scheduler;
let scheduler = &mut locked.scheduler;
// Right now we only perform the operation on a single node without parallelization
// TODO fan out the operation to multiple nodes for better performance
let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?;
@@ -4761,7 +4762,7 @@ impl Service {
//
// The transition we calculate here remains valid later in the function because we hold the op lock on the node:
// nothing else can mutate its availability while we run.
let availability_transition = if let Some(input_availability) = availability {
let availability_transition = if let Some(input_availability) = availability.as_ref() {
let (activate_node, availability_transition) = {
let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(&node_id) else {
@@ -4797,8 +4798,8 @@ impl Service {
));
};
if let Some(availability) = &availability {
node.set_availability(*availability);
if let Some(availability) = availability.as_ref() {
node.set_availability(availability.clone());
}
if let Some(scheduling) = scheduling {

View File

@@ -779,7 +779,7 @@ impl TenantShard {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_secondary(
&self,
scheduler: &Scheduler,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
if self.intent.secondary.is_empty() {
@@ -1595,7 +1595,7 @@ pub(crate) mod tests {
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
// Since there is a node with no locations available, the node with two locations for the
// same tenant should generate an optimization to move one away