mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
optimisation change
This commit is contained in:
@@ -47,6 +47,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self>;
|
||||
|
||||
/// Return a score that drops any components based on node utilization: this is useful
|
||||
/// for finding scores for scheduling optimisation, when we want to avoid rescheduling
|
||||
/// shards due to e.g. disk usage, to avoid flapping.
|
||||
fn disregard_utilization(&self) -> Self;
|
||||
|
||||
fn is_overloaded(&self) -> bool;
|
||||
fn node_id(&self) -> NodeId;
|
||||
}
|
||||
@@ -156,6 +162,28 @@ pub(crate) struct NodeAttachmentSchedulingScore {
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
impl NodeAttachmentSchedulingScore {
|
||||
/// For speculative scheduling: generate the score for this node as if one more tenant
|
||||
/// shard was attached to it.
|
||||
pub(crate) fn project_attachment(&self) -> Self {
|
||||
Self {
|
||||
total_attached_shard_count: self.total_attached_shard_count + 1,
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
/// The literal equality and comparison operators include the node ID to provide a deterministic
|
||||
/// ordering. However, when doing scheduling optimisation we of course don't want to regard
|
||||
/// a difference in node ID as significant, so that code uses this method to exclude that case.
|
||||
pub(crate) fn different(&self, other: &Self) -> bool {
|
||||
self.az_match != other.az_match
|
||||
|| self.affinity_score != other.affinity_score
|
||||
|| self.attached_shards_in_context != other.attached_shards_in_context
|
||||
|| self.utilization_score != other.utilization_score
|
||||
|| self.total_attached_shard_count != other.total_attached_shard_count
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
@@ -184,6 +212,18 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
})
|
||||
}
|
||||
|
||||
/// For use in scheduling optimisation, where we only want to consider the aspects
|
||||
/// of the score that can only be resolved by moving things (such as inter-shard affinity
|
||||
/// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
|
||||
/// can fluctuate for other reasons)
|
||||
fn disregard_utilization(&self) -> Self {
|
||||
Self {
|
||||
utilization_score: 0,
|
||||
total_attached_shard_count: 0,
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
fn is_overloaded(&self) -> bool {
|
||||
PageserverUtilization::is_overloaded(self.utilization_score)
|
||||
}
|
||||
@@ -215,6 +255,17 @@ pub(crate) struct NodeSecondarySchedulingScore {
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
impl NodeSecondarySchedulingScore {
|
||||
/// The literal equality and comparison operators include the node ID to provide a deterministic
|
||||
/// ordering. However, when doing scheduling optimisation we of course don't want to regard
|
||||
/// a difference in node ID as significant, so that code uses this method to exclude that case.
|
||||
pub(crate) fn different(&self, other: &Self) -> bool {
|
||||
self.az_match != other.az_match
|
||||
|| self.affinity_score != other.affinity_score
|
||||
|| self.utilization_score != other.utilization_score
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
@@ -242,6 +293,13 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
})
|
||||
}
|
||||
|
||||
fn disregard_utilization(&self) -> Self {
|
||||
Self {
|
||||
utilization_score: 0,
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
fn is_overloaded(&self) -> bool {
|
||||
PageserverUtilization::is_overloaded(self.utilization_score)
|
||||
}
|
||||
@@ -293,6 +351,10 @@ impl AffinityScore {
|
||||
pub(crate) fn inc(&mut self) {
|
||||
self.0 += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn dec(&mut self) {
|
||||
self.0 -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Add for AffinityScore {
|
||||
@@ -353,15 +415,36 @@ impl ScheduleContext {
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
|
||||
self.nodes
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE)
|
||||
/// Imagine we migrated our attached location to the given node. Return a new context that
|
||||
/// reflects this.
|
||||
pub(crate) fn project_detach(&self, source: NodeId) -> Self {
|
||||
let mut new_context = self.clone();
|
||||
|
||||
if let Some(count) = new_context.attached_nodes.get_mut(&source) {
|
||||
// It's unexpected that we get called in a context where the source of
|
||||
// the migration is not already in the context.
|
||||
debug_assert!(*count > 0);
|
||||
|
||||
if *count > 0 {
|
||||
*count -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(score) = new_context.nodes.get_mut(&source) {
|
||||
score.dec();
|
||||
}
|
||||
|
||||
new_context
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
|
||||
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
|
||||
pub(crate) fn project_secondary_detach(&self, source: NodeId) -> Self {
|
||||
let mut new_context = self.clone();
|
||||
|
||||
if let Some(score) = new_context.nodes.get_mut(&source) {
|
||||
score.dec();
|
||||
}
|
||||
|
||||
new_context
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -636,6 +719,22 @@ impl Scheduler {
|
||||
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
|
||||
}
|
||||
|
||||
/// Calculate a single node's score, used in optimizer logic to compare specific
|
||||
/// nodes' scores.
|
||||
pub(crate) fn compute_node_score<Score>(
|
||||
&mut self,
|
||||
node_id: NodeId,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Score>
|
||||
where
|
||||
Score: NodeSchedulingScore,
|
||||
{
|
||||
self.nodes
|
||||
.get_mut(&node_id)
|
||||
.and_then(|node| Score::generate(&node_id, node, preferred_az, context))
|
||||
}
|
||||
|
||||
/// Compute a schedulling score for each node that the scheduler knows of
|
||||
/// minus a set of hard excluded nodes.
|
||||
fn compute_node_scores<Score>(
|
||||
@@ -742,6 +841,10 @@ impl Scheduler {
|
||||
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
|
||||
self.nodes.get(node_id).map(|n| n.az.clone())
|
||||
}
|
||||
|
||||
/// Unit test access to internal state
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
|
||||
|
||||
@@ -6113,7 +6113,13 @@ impl Service {
|
||||
|
||||
let mut work = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// We are going to plan a bunch of optimisations before applying any of them, so the
|
||||
// utilisation stats on nodes will be effectively stale for the >1st optimisation we
|
||||
// generate. To avoid this causing unstable migrations/flapping, it's important that the
|
||||
// code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`]
|
||||
// to ignore the utilisation component of the score.
|
||||
|
||||
for (_tenant_id, schedule_context, shards) in
|
||||
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
|
||||
@@ -6149,7 +6155,7 @@ impl Service {
|
||||
if let Some(optimization) =
|
||||
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
|
||||
// its primary location based on soft constraints, cut it over.
|
||||
shard.optimize_attachment(nodes, &schedule_context)
|
||||
shard.optimize_attachment(scheduler, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
@@ -6208,8 +6214,10 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
ScheduleOptimizationAction::ReplaceSecondary(_) => {
|
||||
// No extra checks needed to replace a secondary: this does not interrupt client access
|
||||
ScheduleOptimizationAction::ReplaceSecondary(_)
|
||||
| ScheduleOptimizationAction::CreateSecondary(_)
|
||||
| ScheduleOptimizationAction::RemoveSecondary(_) => {
|
||||
// No extra checks needed to manage secondaries: this does not interrupt client access
|
||||
validated_work.push((tenant_shard_id, optimization))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -11,16 +11,14 @@ use crate::{
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::{ReconcileUnits, ReconcilerConfig},
|
||||
scheduler::{
|
||||
AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
|
||||
SecondaryShardTag,
|
||||
AttachedShardTag, NodeAttachmentSchedulingScore, NodeSchedulingScore,
|
||||
NodeSecondarySchedulingScore, RefCountUpdate, ScheduleContext, SecondaryShardTag,
|
||||
},
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use futures::future::{self, Either};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -360,6 +358,10 @@ pub(crate) enum ScheduleOptimizationAction {
|
||||
ReplaceSecondary(ReplaceSecondary),
|
||||
// Migrate attachment to an existing secondary location
|
||||
MigrateAttachment(MigrateAttachment),
|
||||
// Create a secondary location, with the intent of later migrating to it
|
||||
CreateSecondary(NodeId),
|
||||
// Remove a secondary location that we previously created to facilitate a migration
|
||||
RemoveSecondary(NodeId),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone)]
|
||||
@@ -723,90 +725,175 @@ impl TenantShard {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_better_location(
|
||||
&self,
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<NodeId> {
|
||||
// TODO: fast path: if the attached node is already in the preferred AZ, _and_ has no
|
||||
// other shards from the same tenant on it, then skip doing any scheduling calculations.
|
||||
|
||||
let attached = (*self.intent.get_attached())?;
|
||||
let schedule_context = schedule_context.project_detach(attached);
|
||||
|
||||
// Look for a lower-scoring location to attach to
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<AttachedShardTag>(
|
||||
&[], // Don't hard-exclude anything: we want to consider the possibility of migrating to somewhere we already have a secondary
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
tracing::debug!("No candidate node found");
|
||||
return None;
|
||||
};
|
||||
|
||||
if candidate_node == attached {
|
||||
// We're already at the best possible location, so don't migrate
|
||||
tracing::debug!("Candidate node {candidate_node} is already attached");
|
||||
return None;
|
||||
}
|
||||
|
||||
// Check if our candidate node is in the preferred AZ: it might not be, if the scheduler
|
||||
// is trying its best to handle an overloaded AZ.
|
||||
if self.preferred_az_id.is_some()
|
||||
&& scheduler.get_node_az(&candidate_node) != self.preferred_az_id
|
||||
{
|
||||
tracing::debug!(
|
||||
"Candidate node {candidate_node} is not in preferred AZ {:?}",
|
||||
self.preferred_az_id
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Consider whether the candidate is any better than our current location once the
|
||||
// context is updated to reflect the migration.
|
||||
let Some(candidate_score) = scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
|
||||
candidate_node,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
) else {
|
||||
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
|
||||
// This is unexpected, because schedule() yielded this node
|
||||
debug_assert!(false);
|
||||
return None;
|
||||
};
|
||||
|
||||
match scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
|
||||
attached,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
) {
|
||||
Some(current_score) => {
|
||||
// Ignore utilization components when comparing scores: we don't want to migrate
|
||||
// because of transient load variations, it risks making the system thrash, and
|
||||
// migrating for utilization requires a separate high level view of the system to
|
||||
// e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
|
||||
// moving things around in the order that we hit this function.
|
||||
let candidate_score = candidate_score.disregard_utilization();
|
||||
let candidate_score = candidate_score.project_attachment();
|
||||
|
||||
let current_score = current_score.disregard_utilization();
|
||||
|
||||
if candidate_score < current_score && current_score.different(&candidate_score) {
|
||||
tracing::info!("Found a lower scoring location! {candidate_score:?} is better than {current_score:?}");
|
||||
} else {
|
||||
// The candidate node is no better than our current location, so don't migrate
|
||||
tracing::debug!("Candidate node {candidate_node} is no better than our current location {attached}");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// The current node is unavailable for scheduling, so we can't make any sensible
|
||||
// decisions about optimisation. This should be a transient state -- if the node
|
||||
// is offline then it will get evacuated, if is blocked by a scheduling mode
|
||||
// then we will respect that mode by doing nothing.
|
||||
tracing::debug!("Current node {attached} is unavailable for scheduling");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
Some(candidate_node)
|
||||
}
|
||||
|
||||
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
||||
/// its primary location based on soft constraints, switch that secondary location
|
||||
/// to be attached.
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn optimize_attachment(
|
||||
&self,
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<ScheduleOptimization> {
|
||||
let attached = (*self.intent.get_attached())?;
|
||||
if self.intent.secondary.is_empty() {
|
||||
// We can only do useful work if we have both attached and secondary locations: this
|
||||
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
||||
return None;
|
||||
}
|
||||
|
||||
let current_affinity_score = schedule_context.get_node_affinity(attached);
|
||||
let current_attachment_count = schedule_context.get_node_attachments(attached);
|
||||
let replacement = self.find_better_location(scheduler, schedule_context);
|
||||
|
||||
// Generate score for each node, dropping any un-schedulable nodes.
|
||||
let all_pageservers = self.intent.all_pageservers();
|
||||
let mut scores = all_pageservers
|
||||
.iter()
|
||||
.flat_map(|node_id| {
|
||||
let node = nodes.get(node_id);
|
||||
if node.is_none() {
|
||||
None
|
||||
} else if matches!(
|
||||
node.unwrap().get_scheduling(),
|
||||
NodeSchedulingPolicy::Filling
|
||||
) {
|
||||
// If the node is currently filling, don't count it as a candidate to avoid,
|
||||
// racing with the background fill.
|
||||
None
|
||||
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
|
||||
None
|
||||
} else {
|
||||
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
||||
let attachment_count = schedule_context.get_node_attachments(*node_id);
|
||||
Some((*node_id, affinity_score, attachment_count))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort precedence:
|
||||
// 1st - prefer nodes with the lowest total affinity score
|
||||
// 2nd - prefer nodes with the lowest number of attachments in this context
|
||||
// 3rd - if all else is equal, sort by node ID for determinism in tests.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
||||
|
||||
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
|
||||
scores.first()
|
||||
{
|
||||
if attached != *preferred_node {
|
||||
// The best alternative must be more than 1 better than us, otherwise we could end
|
||||
// up flapping back next time we're called (e.g. there's no point migrating from
|
||||
// a location with score 1 to a score zero, because on next location the situation
|
||||
// would be the same, but in reverse).
|
||||
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|
||||
|| current_attachment_count > *preferred_attachment_count + 1
|
||||
{
|
||||
// We have found a candidate and confirmed that its score is preferable
|
||||
// to our current location. See if we have a secondary location in the preferred location already: if not,
|
||||
// then create one.
|
||||
if let Some(replacement) = replacement {
|
||||
if !self.intent.get_secondary().contains(&replacement) {
|
||||
tracing::info!(
|
||||
"Identified optimization({}): create secondary {replacement}",
|
||||
self.tenant_shard_id
|
||||
);
|
||||
Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::CreateSecondary(replacement),
|
||||
})
|
||||
} else {
|
||||
// We already have a secondary in the preferred location, let's try migrating to it. Our caller
|
||||
// will check the warmth of the destination before deciding whether to really execute this.
|
||||
tracing::info!(
|
||||
"Identified optimization({}): migrate attachment {attached}->{replacement}",
|
||||
self.tenant_shard_id
|
||||
);
|
||||
Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: attached,
|
||||
new_attached_node_id: replacement,
|
||||
}),
|
||||
})
|
||||
}
|
||||
} else if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
||||
// We aren't in the process of migrating anywhere, and we're attached in our preferred AZ. If there are
|
||||
// any other secondary locations in our preferred AZ, we presume they were created to facilitate a migration
|
||||
// of the attached location, and remove them.
|
||||
for secondary in self.intent.get_secondary() {
|
||||
if scheduler.get_node_az(secondary) == self.preferred_az_id {
|
||||
tracing::info!(
|
||||
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
|
||||
self.intent.get_secondary()
|
||||
"Identified optimization({}): remove secondary {secondary}",
|
||||
self.tenant_shard_id
|
||||
);
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: attached,
|
||||
new_attached_node_id: *preferred_node,
|
||||
}),
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Node {} is already preferred (score {:?})",
|
||||
preferred_node,
|
||||
preferred_affinity_score
|
||||
);
|
||||
}
|
||||
|
||||
// Fall through: maybe we had excess secondaries in other AZs? Trim them in an arbitrary order
|
||||
// (lowest Node ID first).
|
||||
let mut secondary_node_ids = self.intent.get_secondary().clone();
|
||||
secondary_node_ids.sort();
|
||||
let victim = secondary_node_ids
|
||||
.first()
|
||||
.expect("Within block for > check on secondary count");
|
||||
Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
})
|
||||
} else {
|
||||
// We didn't find somewhere we'd rather be, and we don't have any excess secondaries
|
||||
// to clean up: no action required.
|
||||
None
|
||||
}
|
||||
|
||||
// Fall-through: we didn't find an optimization
|
||||
None
|
||||
// TODO: if we find that our current location is optimal, _and_ we also have a secondary
|
||||
// in the preferred AZ, then clean up that secondary: it was only created to act as a
|
||||
// migration destination.
|
||||
// ...maybe do this in optimize_secondary()?
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
@@ -815,18 +902,18 @@ impl TenantShard {
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<ScheduleOptimization> {
|
||||
if self.intent.secondary.is_empty() {
|
||||
// We can only do useful work if we have both attached and secondary locations: this
|
||||
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
||||
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
||||
// We have extra secondaries, perhaps to facilitate a migration of the attached location:
|
||||
// do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
|
||||
// and we are called again, we will proceed.
|
||||
tracing::debug!("Too many secondaries: skipping");
|
||||
return None;
|
||||
}
|
||||
|
||||
for secondary in self.intent.get_secondary() {
|
||||
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
|
||||
// We're already on a node unaffected any affinity constraints,
|
||||
// so we won't change it.
|
||||
continue;
|
||||
};
|
||||
let schedule_context = schedule_context.project_secondary_detach(*secondary);
|
||||
// TODO: fast path to avoid full scheduling calculation if we're in the right AZ and not
|
||||
// sharing with any other shards in the same tenant
|
||||
|
||||
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
|
||||
// This implicitly limits the choice to nodes that are available, and prefers nodes
|
||||
@@ -834,33 +921,63 @@ impl TenantShard {
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&self.intent.all_pageservers(),
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
&schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
continue;
|
||||
};
|
||||
|
||||
let candidate_affinity_score = schedule_context
|
||||
.nodes
|
||||
.get(&candidate_node)
|
||||
.unwrap_or(&AffinityScore::FREE);
|
||||
let Some(candidate_score) = scheduler
|
||||
.compute_node_score::<NodeSecondarySchedulingScore>(
|
||||
candidate_node,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
)
|
||||
else {
|
||||
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
|
||||
// This is unexpected, because schedule() yielded this node
|
||||
debug_assert!(false);
|
||||
continue;
|
||||
};
|
||||
|
||||
// The best alternative must be more than 1 better than us, otherwise we could end
|
||||
// up flapping back next time we're called.
|
||||
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
|
||||
// If some other node is available and has a lower score than this node, then
|
||||
// that other node is a good place to migrate to.
|
||||
tracing::info!(
|
||||
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
||||
old_node_id: *secondary,
|
||||
new_node_id: candidate_node,
|
||||
}),
|
||||
});
|
||||
match scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
|
||||
*secondary,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
) {
|
||||
Some(current_score) => {
|
||||
// Disregard utilization: we don't want to thrash around based on disk utilization
|
||||
let current_score = current_score.disregard_utilization();
|
||||
let candidate_score = candidate_score.disregard_utilization();
|
||||
|
||||
if candidate_score < current_score && current_score.different(&candidate_score)
|
||||
{
|
||||
tracing::info!(
|
||||
"Identified optimization({}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ",
|
||||
self.tenant_shard_id,
|
||||
self.intent.get_secondary(),
|
||||
candidate_score,
|
||||
current_score
|
||||
);
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::ReplaceSecondary(
|
||||
ReplaceSecondary {
|
||||
old_node_id: *secondary,
|
||||
new_node_id: candidate_node,
|
||||
},
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// The current node is unavailable for scheduling, so we can't make any sensible
|
||||
// decisions about optimisation. This should be a transient state -- if the node
|
||||
// is offline then it will get evacuated, if is blocked by a scheduling mode
|
||||
// then we will respect that mode by doing nothing.
|
||||
tracing::debug!("Current node {secondary} is unavailable for scheduling");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -899,6 +1016,12 @@ impl TenantShard {
|
||||
self.intent.remove_secondary(scheduler, old_node_id);
|
||||
self.intent.push_secondary(scheduler, new_node_id);
|
||||
}
|
||||
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
|
||||
self.intent.push_secondary(scheduler, new_node_id);
|
||||
}
|
||||
ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
|
||||
self.intent.remove_secondary(scheduler, old_secondary);
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
@@ -1732,7 +1855,8 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn optimize_attachment() -> anyhow::Result<()> {
|
||||
/// Simple case: moving attachment to somewhere better where we already have a secondary
|
||||
fn optimize_attachment_simple() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
@@ -1746,16 +1870,17 @@ pub(crate) mod tests {
|
||||
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
schedule_context
|
||||
}
|
||||
|
||||
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
|
||||
|
||||
// Either shard should recognize that it has the option to switch to a secondary location where there
|
||||
// would be no other shards from the same tenant, and request to do so.
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a,
|
||||
Some(ScheduleOptimization {
|
||||
@@ -1766,31 +1891,128 @@ pub(crate) mod tests {
|
||||
})
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
||||
|
||||
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
|
||||
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
|
||||
// of [`Service::optimize_all`] to avoid trying
|
||||
// to do optimizations for multiple shards in the same tenant at the same time. Generating
|
||||
// both optimizations is just done for test purposes
|
||||
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
|
||||
// // Either shard should recognize that it has the option to switch to a secondary location where there
|
||||
// // would be no other shards from the same tenant, and request to do so.
|
||||
// assert_eq!(
|
||||
// optimization_a_prepare,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
||||
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
// assert_eq!(
|
||||
// optimization_a_migrate,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
// old_attached_node_id: NodeId(1),
|
||||
// new_attached_node_id: NodeId(2)
|
||||
// })
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
||||
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
// assert_eq!(
|
||||
// optimization_a_cleanup,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
||||
|
||||
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
shard_b.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Complicated case: moving attachment to somewhere better where we do not have a secondary
|
||||
/// already, creating one as needed.
|
||||
fn optimize_attachment_multistep() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(
|
||||
3,
|
||||
&[
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Two shards of a tenant that wants to be in AZ A
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
||||
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
||||
|
||||
// Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
|
||||
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
|
||||
|
||||
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
schedule_context
|
||||
}
|
||||
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_b,
|
||||
optimization_a_prepare,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_b.sequence,
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
||||
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_migrate,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(1),
|
||||
new_attached_node_id: NodeId(3)
|
||||
old_attached_node_id: NodeId(2),
|
||||
new_attached_node_id: NodeId(1)
|
||||
})
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
||||
|
||||
// Applying these optimizations should result in the end state proposed
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
|
||||
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
|
||||
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_cleanup,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
||||
|
||||
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
shard_b.intent.clear(&mut scheduler);
|
||||
@@ -1848,7 +2070,6 @@ pub(crate) mod tests {
|
||||
// called repeatedly in the background.
|
||||
// Returns the applied optimizations
|
||||
fn optimize_til_idle(
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
shards: &mut [TenantShard],
|
||||
) -> Vec<ScheduleOptimization> {
|
||||
@@ -1866,7 +2087,7 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
let optimization = shard.optimize_attachment(nodes, &schedule_context);
|
||||
let optimization = shard.optimize_attachment(scheduler, &schedule_context);
|
||||
if let Some(optimization) = optimization {
|
||||
optimizations.push(optimization.clone());
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
@@ -1895,18 +2116,40 @@ pub(crate) mod tests {
|
||||
optimizations
|
||||
}
|
||||
|
||||
use test_log::test;
|
||||
|
||||
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
|
||||
/// that it converges.
|
||||
#[test]
|
||||
fn optimize_add_nodes() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
let nodes = make_test_nodes(
|
||||
9,
|
||||
&[
|
||||
// Initial 6 nodes
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
// Three we will add later
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
],
|
||||
);
|
||||
|
||||
// Only show the scheduler a couple of nodes
|
||||
// Only show the scheduler two nodes in each AZ to start with
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
for i in 1..=6 {
|
||||
scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
|
||||
}
|
||||
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let mut shards = make_test_tenant(
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount::new(4),
|
||||
Some(AvailabilityZone("az-a".to_string())),
|
||||
);
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for shard in &mut shards {
|
||||
assert!(shard
|
||||
@@ -1914,30 +2157,50 @@ pub(crate) mod tests {
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
// We should see equal number of locations on the two nodes.
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
|
||||
// Initial: attached locations land in the tenant's home AZ.
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
||||
|
||||
// Add another two nodes: we should see the shards spread out when their optimize
|
||||
// methods are called
|
||||
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
||||
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
|
||||
// Initial: secondary locations in a remote AZ
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
||||
// Add another three nodes: we should see the shards spread out when their optimize
|
||||
// methods are called
|
||||
scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
|
||||
optimize_til_idle(&mut scheduler, &mut shards);
|
||||
|
||||
// We expect one attached location was moved to the new node in the tenant's home AZ
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
|
||||
// The original node has one less attached shard
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
||||
|
||||
// One of the original nodes still has two attachments, since there are an odd number of nodes
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
|
||||
// None of our secondaries moved, since we already had enough nodes for those to be
|
||||
// scheduled perfectly
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
@@ -1977,10 +2240,10 @@ pub(crate) mod tests {
|
||||
shard.schedule(&mut scheduler, context).unwrap();
|
||||
}
|
||||
|
||||
let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
|
||||
let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
|
||||
assert_eq!(applied_to_a, vec![]);
|
||||
|
||||
let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
|
||||
let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
|
||||
assert_eq!(applied_to_b, vec![]);
|
||||
|
||||
for shard in a.iter_mut().chain(b.iter_mut()) {
|
||||
|
||||
Reference in New Issue
Block a user