storcon: do az aware scheduling (#9083)

## Problem

Storage controller didn't previously consider AZ locality between
compute and pageservers
when scheduling nodes. Control plane has this feature, and, since we are
migrating tenants
away from it, we need feature parity to avoid perf degradations.

## Summary of changes

The change itself is fairly simple:
1. Thread az info into the scheduler
2. Add an extra member to the scheduling scores

Step (2) deserves some more discussion. Let's break it down by the shard
type being scheduled:

**Attached Shards**

We wish for attached shards of a tenant to end up in the preferred AZ of
the tenant since that
is where the compute is like to be. 

The AZ member for `NodeAttachmentSchedulingScore` has been placed
below the affinity score (so it's got the second biggest weight for
picking the node). The rationale for going
below the affinity score is to avoid having all shards of a single
tenant placed on the same node in 2 node
regions, since that would mean that one tenant can drive the general
workload of an entire pageserver.
I'm not 100% sure this is the right decision, so open to discussing
hoisting the AZ up to first place.

 **Secondary Shards**

We wish for secondary shards of a tenant to be scheduled in a different
AZ from the preferred one
for HA purposes.

The AZ member for `NodeSecondarySchedulingScore` has been placed first,
so nodes in different AZs
from the preferred one will always be considered first. On small
clusters, this can mean that all the secondaries
of a tenant are scheduled to the same pageserver, but secondaries don't
use up as many resources as the
attached location, so IMO the argument made for attached shards doesn't
hold.

Related: https://github.com/neondatabase/neon/issues/8848
This commit is contained in:
Vlad Lazar
2024-09-25 14:31:04 +01:00
committed by GitHub
parent 7dcfcccf7c
commit 2cf47b1477
8 changed files with 445 additions and 65 deletions

View File

@@ -2,8 +2,8 @@ use std::{str::FromStr, time::Duration};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
TenantLocateResponseShard,
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
NodeSchedulingPolicy, TenantLocateResponseShard,
},
shard::TenantShardId,
};
@@ -36,7 +36,7 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: String,
availability_zone_id: AvailabilityZone,
// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
@@ -64,8 +64,8 @@ impl Node {
}
#[allow(unused)]
pub(crate) fn get_availability_zone_id(&self) -> &str {
self.availability_zone_id.as_str()
pub(crate) fn get_availability_zone_id(&self) -> &AvailabilityZone {
&self.availability_zone_id
}
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
@@ -181,7 +181,7 @@ impl Node {
listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: String,
availability_zone_id: AvailabilityZone,
) -> Self {
Self {
id,
@@ -204,7 +204,7 @@ impl Node {
listen_http_port: self.listen_http_port as i32,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
availability_zone_id: self.availability_zone_id.clone(),
availability_zone_id: self.availability_zone_id.0.clone(),
}
}
@@ -219,7 +219,7 @@ impl Node {
listen_http_port: np.listen_http_port as u16,
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
availability_zone_id: np.availability_zone_id,
availability_zone_id: AvailabilityZone(np.availability_zone_id),
cancel: CancellationToken::new(),
}
}

View File

@@ -9,6 +9,7 @@ use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
@@ -667,8 +668,8 @@ impl Persistence {
pub(crate) async fn set_tenant_shard_preferred_azs(
&self,
preferred_azs: Vec<(TenantShardId, String)>,
) -> DatabaseResult<Vec<(TenantShardId, String)>> {
preferred_azs: Vec<(TenantShardId, AvailabilityZone)>,
) -> DatabaseResult<Vec<(TenantShardId, AvailabilityZone)>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::SetPreferredAzs, move |conn| {
@@ -679,7 +680,7 @@ impl Persistence {
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set(preferred_az_id.eq(preferred_az))
.set(preferred_az_id.eq(preferred_az.0.clone()))
.execute(conn)?;
if updated == 1 {

View File

@@ -1,6 +1,6 @@
use crate::{node::Node, tenant_shard::TenantShard};
use itertools::Itertools;
use pageserver_api::models::PageserverUtilization;
use pageserver_api::{controller_api::AvailabilityZone, models::PageserverUtilization};
use serde::Serialize;
use std::{collections::HashMap, fmt::Debug};
use utils::{http::error::ApiError, id::NodeId};
@@ -32,6 +32,8 @@ pub(crate) struct SchedulerNode {
shard_count: usize,
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
attached_shard_count: usize,
/// Availability zone id in which the node resides
az: AvailabilityZone,
/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
@@ -42,6 +44,7 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
fn generate(
node_id: &NodeId,
node: &mut SchedulerNode,
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Self>;
fn is_overloaded(&self) -> bool;
@@ -62,6 +65,72 @@ impl ShardTag for SecondaryShardTag {
type Score = NodeSecondarySchedulingScore;
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
enum AzMatch {
Yes,
No,
Unknown,
}
impl AzMatch {
fn new(node_az: &AvailabilityZone, shard_preferred_az: Option<&AvailabilityZone>) -> Self {
match shard_preferred_az {
Some(preferred_az) if preferred_az == node_az => Self::Yes,
Some(_preferred_az) => Self::No,
None => Self::Unknown,
}
}
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
struct AttachmentAzMatch(AzMatch);
impl Ord for AttachmentAzMatch {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Lower scores indicate a more suitable node.
// Note that we prefer a node for which we don't have
// info to a node which we are certain doesn't match the
// preferred AZ of the shard.
let az_match_score = |az_match: &AzMatch| match az_match {
AzMatch::Yes => 0,
AzMatch::Unknown => 1,
AzMatch::No => 2,
};
az_match_score(&self.0).cmp(&az_match_score(&other.0))
}
}
impl PartialOrd for AttachmentAzMatch {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
struct SecondaryAzMatch(AzMatch);
impl Ord for SecondaryAzMatch {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Lower scores indicate a more suitable node.
// For secondary locations we wish to avoid the preferred AZ
// of the shard.
let az_match_score = |az_match: &AzMatch| match az_match {
AzMatch::No => 0,
AzMatch::Unknown => 1,
AzMatch::Yes => 2,
};
az_match_score(&self.0).cmp(&az_match_score(&other.0))
}
}
impl PartialOrd for SecondaryAzMatch {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// Scheduling score of a given node for shard attachments.
/// Lower scores indicate more suitable nodes.
/// Ordering is given by member declaration order (top to bottom).
@@ -70,6 +139,10 @@ pub(crate) struct NodeAttachmentSchedulingScore {
/// The number of shards belonging to the tenant currently being
/// scheduled that are attached to this node.
affinity_score: AffinityScore,
/// Flag indicating whether this node matches the preferred AZ
/// of the shard. For equal affinity scores, nodes in the matching AZ
/// are considered first.
az_match: AttachmentAzMatch,
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
/// This normally tracks the number of attached shards belonging to the
/// tenant being scheduled that are already on this node.
@@ -87,6 +160,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
fn generate(
node_id: &NodeId,
node: &mut SchedulerNode,
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Self> {
let utilization = match &mut node.may_schedule {
@@ -102,6 +176,7 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
.get(node_id)
.copied()
.unwrap_or(AffinityScore::FREE),
az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
utilization_score: utilization.cached_score(),
total_attached_shard_count: node.attached_shard_count,
@@ -123,6 +198,11 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
/// Ordering is given by member declaration order (top to bottom).
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub(crate) struct NodeSecondarySchedulingScore {
/// Flag indicating whether this node matches the preferred AZ
/// of the shard. For secondary locations we wish to avoid nodes in.
/// the preferred AZ of the shard, since that's where the attached location
/// should be scheduled and having the secondary in the same AZ is bad for HA.
az_match: SecondaryAzMatch,
/// The number of shards belonging to the tenant currently being
/// scheduled that are attached to this node.
affinity_score: AffinityScore,
@@ -139,6 +219,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
fn generate(
node_id: &NodeId,
node: &mut SchedulerNode,
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Self> {
let utilization = match &mut node.may_schedule {
@@ -149,6 +230,7 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
};
Some(Self {
az_match: SecondaryAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
affinity_score: context
.nodes
.get(node_id)
@@ -179,6 +261,7 @@ impl PartialEq for SchedulerNode {
may_schedule_matches
&& self.shard_count == other.shard_count
&& self.attached_shard_count == other.attached_shard_count
&& self.az == other.az
}
}
@@ -293,6 +376,7 @@ impl Scheduler {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
az: node.get_availability_zone_id().clone(),
},
);
}
@@ -319,6 +403,7 @@ impl Scheduler {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
az: node.get_availability_zone_id().clone(),
},
);
}
@@ -497,6 +582,7 @@ impl Scheduler {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
az: node.get_availability_zone_id().clone(),
});
}
}
@@ -542,6 +628,7 @@ impl Scheduler {
fn compute_node_scores<Score>(
&mut self,
hard_exclude: &[NodeId],
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Vec<Score>
where
@@ -553,7 +640,7 @@ impl Scheduler {
if hard_exclude.contains(k) {
None
} else {
Score::generate(k, v, context)
Score::generate(k, v, preferred_az, context)
}
})
.collect()
@@ -571,13 +658,15 @@ impl Scheduler {
pub(crate) fn schedule_shard<Tag: ShardTag>(
&mut self,
hard_exclude: &[NodeId],
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Result<NodeId, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
}
let mut scores = self.compute_node_scores::<Tag::Score>(hard_exclude, context);
let mut scores =
self.compute_node_scores::<Tag::Score>(hard_exclude, preferred_az, context);
// Exclude nodes whose utilization is critically high, if there are alternatives available. This will
// cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
@@ -634,6 +723,12 @@ impl Scheduler {
Ok(node_id)
}
/// Selects any available node. This is suitable for performing background work (e.g. S3
/// deletions).
pub(crate) fn any_available_node(&mut self) -> Result<NodeId, ScheduleError> {
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
}
/// Unit test access to internal state
#[cfg(test)]
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
@@ -650,13 +745,22 @@ impl Scheduler {
pub(crate) mod test_utils {
use crate::node::Node;
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
use pageserver_api::{
controller_api::{AvailabilityZone, NodeAvailability},
models::utilization::test_utilization,
};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
///
/// Node IDs start at one.
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
///
/// The `azs` argument specifies the list of availability zones which will be assigned
/// to nodes in round-robin fashion. If empy, a default AZ is assigned.
pub(crate) fn make_test_nodes(n: u64, azs: &[AvailabilityZone]) -> HashMap<NodeId, Node> {
let mut az_iter = azs.iter().cycle();
(1..n + 1)
.map(|i| {
(NodeId(i), {
@@ -666,7 +770,10 @@ pub(crate) mod test_utils {
80 + i as u16,
format!("pghost-{i}"),
5432 + i as u16,
"test-az".to_string(),
az_iter
.next()
.cloned()
.unwrap_or(AvailabilityZone("test-az".to_string())),
);
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
assert!(node.is_available());
@@ -686,7 +793,7 @@ mod tests {
use crate::tenant_shard::IntentState;
#[test]
fn scheduler_basic() -> anyhow::Result<()> {
let nodes = test_utils::make_test_nodes(2);
let nodes = test_utils::make_test_nodes(2, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut t1_intent = IntentState::new();
@@ -694,9 +801,9 @@ mod tests {
let context = ScheduleContext::default();
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
t1_intent.set_attached(&mut scheduler, Some(scheduled));
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &None, &context)?;
t2_intent.set_attached(&mut scheduler, Some(scheduled));
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
@@ -705,8 +812,11 @@ mod tests {
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
let scheduled =
scheduler.schedule_shard::<AttachedShardTag>(&t1_intent.all_pageservers(), &context)?;
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(
&t1_intent.all_pageservers(),
&None,
&context,
)?;
t1_intent.push_secondary(&mut scheduler, scheduled);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
@@ -746,7 +856,7 @@ mod tests {
#[test]
/// Test the PageserverUtilization's contribution to scheduling algorithm
fn scheduler_utilization() {
let mut nodes = test_utils::make_test_nodes(3);
let mut nodes = test_utils::make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
// Need to keep these alive because they contribute to shard counts via RAII
@@ -761,7 +871,7 @@ mod tests {
context: &ScheduleContext,
) {
let scheduled = scheduler
.schedule_shard::<AttachedShardTag>(&[], context)
.schedule_shard::<AttachedShardTag>(&[], &None, context)
.unwrap();
let mut intent = IntentState::new();
intent.set_attached(scheduler, Some(scheduled));
@@ -870,4 +980,98 @@ mod tests {
intent.clear(&mut scheduler);
}
}
#[test]
/// A simple test that showcases AZ-aware scheduling and its interaction with
/// affinity scores.
fn az_scheduling() {
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let nodes = test_utils::make_test_nodes(3, &[az_a_tag.clone(), az_b_tag.clone()]);
let mut scheduler = Scheduler::new(nodes.values());
// Need to keep these alive because they contribute to shard counts via RAII
let mut scheduled_intents = Vec::new();
let mut context = ScheduleContext::default();
fn assert_scheduler_chooses<Tag: ShardTag>(
expect_node: NodeId,
preferred_az: Option<AvailabilityZone>,
scheduled_intents: &mut Vec<IntentState>,
scheduler: &mut Scheduler,
context: &mut ScheduleContext,
) {
let scheduled = scheduler
.schedule_shard::<Tag>(&[], &preferred_az, context)
.unwrap();
let mut intent = IntentState::new();
intent.set_attached(scheduler, Some(scheduled));
scheduled_intents.push(intent);
assert_eq!(scheduled, expect_node);
context.avoid(&[scheduled]);
}
assert_scheduler_chooses::<AttachedShardTag>(
NodeId(1),
Some(az_a_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Node 2 and 3 have affinity score equal to 0, but node 3
// is in "az-a" so we prefer that.
assert_scheduler_chooses::<AttachedShardTag>(
NodeId(3),
Some(az_a_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
assert_scheduler_chooses::<AttachedShardTag>(
NodeId(2),
Some(az_a_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Avoid nodes in "az-a" for the secondary location.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(2),
Some(az_a_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Avoid nodes in "az-b" for the secondary location.
// Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(1),
Some(az_b_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Avoid nodes in "az-b" for the secondary location.
// Node 3 has lower affinity score than 1, so prefer that.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(3),
Some(az_b_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
for mut intent in scheduled_intents {
intent.clear(&mut scheduler);
}
}
}

View File

@@ -26,7 +26,7 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
@@ -1265,6 +1265,8 @@ impl Service {
#[cfg(feature = "testing")]
{
use pageserver_api::controller_api::AvailabilityZone;
// Hack: insert scheduler state for all nodes referenced by shards, as compatibility
// tests only store the shards, not the nodes. The nodes will be loaded shortly
// after when pageservers start up and register.
@@ -1282,7 +1284,7 @@ impl Service {
123,
"".to_string(),
123,
"test_az".to_string(),
AvailabilityZone("test_az".to_string()),
);
scheduler.node_upsert(&node);
@@ -2099,7 +2101,7 @@ impl Service {
let az_id = locked
.nodes
.get(&resp.node_id)
.map(|n| n.get_availability_zone_id().to_string())?;
.map(|n| n.get_availability_zone_id().clone())?;
Some((resp.shard_id, az_id))
})
@@ -2629,8 +2631,7 @@ impl Service {
let scheduler = &mut locked.scheduler;
// Right now we only perform the operation on a single node without parallelization
// TODO fan out the operation to multiple nodes for better performance
let node_id =
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
let node_id = scheduler.any_available_node()?;
let node = locked
.nodes
.get(&node_id)
@@ -2816,8 +2817,7 @@ impl Service {
// Pick an arbitrary node to use for remote deletions (does not have to be where the tenant
// was attached, just has to be able to see the S3 content)
let node_id =
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
let node_id = scheduler.any_available_node()?;
let node = nodes
.get(&node_id)
.expect("Pageservers may not be deleted while lock is active");
@@ -4481,7 +4481,7 @@ impl Service {
let az_id = locked
.nodes
.get(node_id)
.map(|n| n.get_availability_zone_id().to_string())?;
.map(|n| n.get_availability_zone_id().clone())?;
Some((*tid, az_id))
})

View File

@@ -15,7 +15,7 @@ use crate::{
service::ReconcileResultRequest,
};
use pageserver_api::controller_api::{
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
@@ -146,7 +146,7 @@ pub(crate) struct TenantShard {
// We should attempt to schedule this shard in the provided AZ to
// decrease chances of cross-AZ compute.
preferred_az_id: Option<String>,
preferred_az_id: Option<AvailabilityZone>,
}
#[derive(Default, Clone, Debug, Serialize)]
@@ -540,8 +540,11 @@ impl TenantShard {
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
let node_id =
scheduler.schedule_shard::<AttachedShardTag>(&self.intent.secondary, context)?;
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
&self.intent.secondary,
&self.preferred_az_id,
context,
)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
@@ -622,8 +625,11 @@ impl TenantShard {
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler
.schedule_shard::<SecondaryShardTag>(&used_pageservers, context)?;
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
&used_pageservers,
&self.preferred_az_id,
context,
)?;
self.intent.push_secondary(scheduler, node_id);
used_pageservers.push(node_id);
modified = true;
@@ -636,7 +642,11 @@ impl TenantShard {
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(&[], context)?;
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
&[],
&self.preferred_az_id,
context,
)?;
self.intent.push_secondary(scheduler, node_id);
modified = true;
}
@@ -815,6 +825,7 @@ impl TenantShard {
// with lower utilization.
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
&self.intent.all_pageservers(),
&self.preferred_az_id,
schedule_context,
) else {
// A scheduling error means we have no possible candidate replacements
@@ -1313,7 +1324,7 @@ impl TenantShard {
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
preferred_az_id: tsp.preferred_az_id,
preferred_az_id: tsp.preferred_az_id.map(AvailabilityZone),
})
}
@@ -1329,15 +1340,15 @@ impl TenantShard {
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
preferred_az_id: self.preferred_az_id.clone(),
preferred_az_id: self.preferred_az_id.as_ref().map(|az| az.0.clone()),
}
}
pub(crate) fn preferred_az(&self) -> Option<&str> {
self.preferred_az_id.as_deref()
pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
self.preferred_az_id.as_ref()
}
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: String) {
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
self.preferred_az_id = Some(preferred_az_id);
}
}
@@ -1350,6 +1361,7 @@ pub(crate) mod tests {
controller_api::NodeAvailability,
shard::{ShardCount, ShardNumber},
};
use rand::{rngs::StdRng, SeedableRng};
use utils::id::TenantId;
use crate::scheduler::test_utils::make_test_nodes;
@@ -1378,7 +1390,11 @@ pub(crate) mod tests {
)
}
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
fn make_test_tenant(
policy: PlacementPolicy,
shard_count: ShardCount,
preferred_az: Option<AvailabilityZone>,
) -> Vec<TenantShard> {
let tenant_id = TenantId::generate();
(0..shard_count.count())
@@ -1390,7 +1406,7 @@ pub(crate) mod tests {
shard_number,
shard_count,
};
TenantShard::new(
let mut ts = TenantShard::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
@@ -1399,7 +1415,13 @@ pub(crate) mod tests {
)
.unwrap(),
policy.clone(),
)
);
if let Some(az) = &preferred_az {
ts.set_preferred_az(az.clone());
}
ts
})
.collect()
}
@@ -1410,7 +1432,7 @@ pub(crate) mod tests {
fn tenant_ha_scheduling() -> anyhow::Result<()> {
// Start with three nodes. Our tenant will only use two. The third one is
// expected to remain unused.
let mut nodes = make_test_nodes(3);
let mut nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut context = ScheduleContext::default();
@@ -1462,7 +1484,7 @@ pub(crate) mod tests {
#[test]
fn intent_from_observed() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
@@ -1512,7 +1534,7 @@ pub(crate) mod tests {
#[test]
fn scheduling_mode() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
@@ -1537,7 +1559,7 @@ pub(crate) mod tests {
#[test]
fn optimize_attachment() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
@@ -1604,7 +1626,7 @@ pub(crate) mod tests {
#[test]
fn optimize_secondary() -> anyhow::Result<()> {
let nodes = make_test_nodes(4);
let nodes = make_test_nodes(4, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
@@ -1703,14 +1725,14 @@ pub(crate) mod tests {
/// that it converges.
#[test]
fn optimize_add_nodes() -> anyhow::Result<()> {
let nodes = make_test_nodes(4);
let nodes = make_test_nodes(4, &[]);
// Only show the scheduler a couple of nodes
let mut scheduler = Scheduler::new([].iter());
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let mut schedule_context = ScheduleContext::default();
for shard in &mut shards {
assert!(shard
@@ -1759,16 +1781,16 @@ pub(crate) mod tests {
fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
use itertools::Itertools;
let nodes = make_test_nodes(2);
let nodes = make_test_nodes(2, &[]);
let mut scheduler = Scheduler::new([].iter());
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
@@ -1793,4 +1815,147 @@ pub(crate) mod tests {
Ok(())
}
#[test]
fn random_az_shard_scheduling() -> anyhow::Result<()> {
use rand::seq::SliceRandom;
for seed in 0..50 {
eprintln!("Running test with seed {seed}");
let mut rng = StdRng::seed_from_u64(seed);
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let azs = [az_a_tag, az_b_tag];
let nodes = make_test_nodes(4, &azs);
let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
let mut scheduler = Scheduler::new([].iter());
for node in nodes.values() {
scheduler.node_upsert(node);
}
let mut shards = Vec::default();
let mut contexts = Vec::default();
let mut az_picker = azs.iter().cycle().cloned();
for i in 0..100 {
let az = az_picker.next().unwrap();
let shard_count = i % 4 + 1;
*shards_per_az.entry(az.clone()).or_default() += shard_count;
let tenant_shards = make_test_tenant(
PlacementPolicy::Attached(1),
ShardCount::new(shard_count.try_into().unwrap()),
Some(az),
);
let context = Rc::new(RefCell::new(ScheduleContext::default()));
contexts.push(context.clone());
let with_ctx = tenant_shards
.into_iter()
.map(|shard| (shard, context.clone()));
for shard_with_ctx in with_ctx {
shards.push(shard_with_ctx);
}
}
shards.shuffle(&mut rng);
#[derive(Default, Debug)]
struct NodeStats {
attachments: u32,
secondaries: u32,
}
let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
let mut attachments_in_wrong_az = 0;
let mut secondaries_in_wrong_az = 0;
for (shard, context) in &mut shards {
let context = &mut *context.borrow_mut();
shard.schedule(&mut scheduler, context).unwrap();
let attached_node = shard.intent.get_attached().unwrap();
let stats = node_stats.entry(attached_node).or_default();
stats.attachments += 1;
let secondary_node = *shard.intent.get_secondary().first().unwrap();
let stats = node_stats.entry(secondary_node).or_default();
stats.secondaries += 1;
let attached_node_az = nodes
.get(&attached_node)
.unwrap()
.get_availability_zone_id();
let secondary_node_az = nodes
.get(&secondary_node)
.unwrap()
.get_availability_zone_id();
let preferred_az = shard.preferred_az().unwrap();
if attached_node_az != preferred_az {
eprintln!(
"{} attachment was scheduled in AZ {} but preferred AZ {}",
shard.tenant_shard_id, attached_node_az, preferred_az
);
attachments_in_wrong_az += 1;
}
if secondary_node_az == preferred_az {
eprintln!(
"{} secondary was scheduled in AZ {} which matches preference",
shard.tenant_shard_id, attached_node_az
);
secondaries_in_wrong_az += 1;
}
}
let mut violations = Vec::default();
if attachments_in_wrong_az > 0 {
violations.push(format!(
"{} attachments scheduled to the incorrect AZ",
attachments_in_wrong_az
));
}
if secondaries_in_wrong_az > 0 {
violations.push(format!(
"{} secondaries scheduled to the incorrect AZ",
secondaries_in_wrong_az
));
}
eprintln!(
"attachments_in_wrong_az={} secondaries_in_wrong_az={}",
attachments_in_wrong_az, secondaries_in_wrong_az
);
for (node_id, stats) in &node_stats {
let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
let allowed_attachment_load =
(ideal_attachment_load - 1)..(ideal_attachment_load + 2);
if !allowed_attachment_load.contains(&stats.attachments) {
violations.push(format!(
"Found {} attachments on node {}, but expected {}",
stats.attachments, node_id, ideal_attachment_load
));
}
eprintln!(
"{}: attachments={} secondaries={} ideal_attachment_load={}",
node_id, stats.attachments, stats.secondaries, ideal_attachment_load
);
}
assert!(violations.is_empty(), "{violations:?}");
for (mut shard, _ctx) in shards {
shard.intent.clear(&mut scheduler);
}
}
Ok(())
}
}