mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
14 Commits
release-pr
...
jcsp/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50118e0347 | ||
|
|
f943fa5753 | ||
|
|
9172476147 | ||
|
|
e3b8a3ee57 | ||
|
|
5bb7472eab | ||
|
|
0d6966b9cb | ||
|
|
44689fd90b | ||
|
|
43f55136b6 | ||
|
|
9971913fa0 | ||
|
|
01643006c6 | ||
|
|
ea6f6a87a2 | ||
|
|
017fffe583 | ||
|
|
cf896ff144 | ||
|
|
bcd888126e |
74
Cargo.lock
generated
74
Cargo.lock
generated
@@ -1948,6 +1948,15 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_filter"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.10.2"
|
||||
@@ -1961,6 +1970,18 @@ dependencies = [
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"env_filter",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.2.2"
|
||||
@@ -3368,6 +3389,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.1"
|
||||
@@ -3663,6 +3694,12 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "p256"
|
||||
version = "0.11.1"
|
||||
@@ -4182,7 +4219,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f998c00148ab7c847bd7e6cfd3a906d0e7473"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4195,7 +4232,6 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"sha2",
|
||||
"stringprep",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4217,7 +4253,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f998c00148ab7c847bd7e6cfd3a906d0e7473"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4273,7 +4309,7 @@ dependencies = [
|
||||
"bindgen",
|
||||
"bytes",
|
||||
"crc32c",
|
||||
"env_logger",
|
||||
"env_logger 0.10.2",
|
||||
"log",
|
||||
"memoffset 0.9.0",
|
||||
"once_cell",
|
||||
@@ -4541,7 +4577,7 @@ dependencies = [
|
||||
"consumption_metrics",
|
||||
"dashmap",
|
||||
"ecdsa 0.16.9",
|
||||
"env_logger",
|
||||
"env_logger 0.10.2",
|
||||
"fallible-iterator",
|
||||
"flate2",
|
||||
"framed-websockets",
|
||||
@@ -6064,6 +6100,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"test-log",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -6350,6 +6387,28 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "test-log"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93"
|
||||
dependencies = [
|
||||
"env_logger 0.11.2",
|
||||
"test-log-macros",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "test-log-macros"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.69"
|
||||
@@ -6547,7 +6606,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f998c00148ab7c847bd7e6cfd3a906d0e7473"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -6901,6 +6960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -7209,7 +7269,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"camino-tempfile",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"env_logger 0.10.2",
|
||||
"log",
|
||||
"postgres",
|
||||
"postgres_ffi",
|
||||
|
||||
@@ -325,6 +325,16 @@ pub enum PlacementPolicy {
|
||||
Detached,
|
||||
}
|
||||
|
||||
impl PlacementPolicy {
|
||||
pub fn want_secondaries(&self) -> usize {
|
||||
match self {
|
||||
PlacementPolicy::Attached(secondary_count) => *secondary_count,
|
||||
PlacementPolicy::Secondary => 1,
|
||||
PlacementPolicy::Detached => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateResponse {}
|
||||
|
||||
|
||||
@@ -56,3 +56,6 @@ utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
[dev-dependencies]
|
||||
test-log = {version="0.2.16"}
|
||||
@@ -809,7 +809,21 @@ impl Reconciler {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(ReconcileError::Cancel);
|
||||
}
|
||||
self.location_config(&node, conf, None, false).await?;
|
||||
// We only try to configure secondary locations if the node is available. This does
|
||||
// not stop us succeeding with the reconcile, because our core goal is to make the
|
||||
// shard _available_ (the attached location), and configuring secondary locations
|
||||
// can be done lazily when the node becomes available (via background reconciliation).
|
||||
if node.is_available() {
|
||||
self.location_config(&node, conf, None, false).await?;
|
||||
} else {
|
||||
// If the node is unavailable, we skip and consider the reconciliation successful: this
|
||||
// is a common case where a pageserver is marked unavailable: we demote a location on
|
||||
// that unavailable pageserver to secondary.
|
||||
tracing::info!("Skipping configuring secondary location {node}, it is unavailable");
|
||||
self.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: None });
|
||||
}
|
||||
}
|
||||
|
||||
// The condition below identifies a detach. We must have no attached intent and
|
||||
|
||||
@@ -47,6 +47,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self>;
|
||||
|
||||
/// Return a score that drops any components based on node utilization: this is useful
|
||||
/// for finding scores for scheduling optimisation, when we want to avoid rescheduling
|
||||
/// shards due to e.g. disk usage, to avoid flapping.
|
||||
fn for_optimization(&self) -> Self;
|
||||
|
||||
fn is_overloaded(&self) -> bool;
|
||||
fn node_id(&self) -> NodeId;
|
||||
}
|
||||
@@ -136,17 +142,13 @@ impl PartialOrd for SecondaryAzMatch {
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub(crate) struct NodeAttachmentSchedulingScore {
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
/// Flag indicating whether this node matches the preferred AZ
|
||||
/// of the shard. For equal affinity scores, nodes in the matching AZ
|
||||
/// are considered first.
|
||||
az_match: AttachmentAzMatch,
|
||||
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
|
||||
/// This normally tracks the number of attached shards belonging to the
|
||||
/// tenant being scheduled that are already on this node.
|
||||
attached_shards_in_context: usize,
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
/// Utilisation score that combines shard count and disk utilisation
|
||||
utilization_score: u64,
|
||||
/// Total number of shards attached to this node. When nodes have identical utilisation, this
|
||||
@@ -177,13 +179,25 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE),
|
||||
az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
|
||||
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
|
||||
utilization_score: utilization.cached_score(),
|
||||
total_attached_shard_count: node.attached_shard_count,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// For use in scheduling optimisation, where we only want to consider the aspects
|
||||
/// of the score that can only be resolved by moving things (such as inter-shard affinity
|
||||
/// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
|
||||
/// can fluctuate for other reasons)
|
||||
fn for_optimization(&self) -> Self {
|
||||
Self {
|
||||
utilization_score: 0,
|
||||
total_attached_shard_count: 0,
|
||||
node_id: NodeId(0),
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
fn is_overloaded(&self) -> bool {
|
||||
PageserverUtilization::is_overloaded(self.utilization_score)
|
||||
}
|
||||
@@ -242,6 +256,15 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
})
|
||||
}
|
||||
|
||||
fn for_optimization(&self) -> Self {
|
||||
Self {
|
||||
utilization_score: 0,
|
||||
total_attached_shard_count: 0,
|
||||
node_id: NodeId(0),
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
fn is_overloaded(&self) -> bool {
|
||||
PageserverUtilization::is_overloaded(self.utilization_score)
|
||||
}
|
||||
@@ -293,6 +316,10 @@ impl AffinityScore {
|
||||
pub(crate) fn inc(&mut self) {
|
||||
self.0 += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn dec(&mut self) {
|
||||
self.0 -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Add for AffinityScore {
|
||||
@@ -353,15 +380,44 @@ impl ScheduleContext {
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
|
||||
self.nodes
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE)
|
||||
/// Imagine we migrated our attached location to the given node. Return a new context that
|
||||
/// reflects this.
|
||||
pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
|
||||
let mut new_context = self.clone();
|
||||
|
||||
if let Some(attached) = shard.intent.get_attached() {
|
||||
if let Some(count) = new_context.attached_nodes.get_mut(attached) {
|
||||
// It's unexpected that we get called in a context where the source of
|
||||
// the migration is not already in the context.
|
||||
debug_assert!(*count > 0);
|
||||
|
||||
if *count > 0 {
|
||||
*count -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(score) = new_context.nodes.get_mut(attached) {
|
||||
score.dec();
|
||||
}
|
||||
}
|
||||
|
||||
for secondary in shard.intent.get_secondary() {
|
||||
if let Some(score) = new_context.nodes.get_mut(secondary) {
|
||||
score.dec();
|
||||
}
|
||||
}
|
||||
|
||||
new_context
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
|
||||
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
|
||||
pub(crate) fn project_secondary_detach(&self, source: NodeId) -> Self {
|
||||
let mut new_context = self.clone();
|
||||
|
||||
if let Some(score) = new_context.nodes.get_mut(&source) {
|
||||
score.dec();
|
||||
}
|
||||
|
||||
new_context
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -636,6 +692,22 @@ impl Scheduler {
|
||||
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
|
||||
}
|
||||
|
||||
/// Calculate a single node's score, used in optimizer logic to compare specific
|
||||
/// nodes' scores.
|
||||
pub(crate) fn compute_node_score<Score>(
|
||||
&mut self,
|
||||
node_id: NodeId,
|
||||
preferred_az: &Option<AvailabilityZone>,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Score>
|
||||
where
|
||||
Score: NodeSchedulingScore,
|
||||
{
|
||||
self.nodes
|
||||
.get_mut(&node_id)
|
||||
.and_then(|node| Score::generate(&node_id, node, preferred_az, context))
|
||||
}
|
||||
|
||||
/// Compute a schedulling score for each node that the scheduler knows of
|
||||
/// minus a set of hard excluded nodes.
|
||||
fn compute_node_scores<Score>(
|
||||
@@ -727,7 +799,7 @@ impl Scheduler {
|
||||
tracing::info!(
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
|
||||
scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
|
||||
);
|
||||
);
|
||||
}
|
||||
|
||||
// Note that we do not update shard count here to reflect the scheduling: that
|
||||
@@ -742,6 +814,10 @@ impl Scheduler {
|
||||
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
|
||||
self.nodes.get(node_id).map(|n| n.az.clone())
|
||||
}
|
||||
|
||||
/// Unit test access to internal state
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
|
||||
@@ -799,7 +875,14 @@ pub(crate) mod test_utils {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
|
||||
use pageserver_api::{
|
||||
controller_api::NodeAvailability, models::utilization::test_utilization,
|
||||
shard::ShardIdentity,
|
||||
};
|
||||
use utils::{
|
||||
id::TenantId,
|
||||
shard::{ShardCount, ShardNumber, TenantShardId},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -1045,9 +1128,9 @@ mod tests {
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
|
||||
// Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
|
||||
assert_scheduler_chooses::<AttachedShardTag>(
|
||||
NodeId(2),
|
||||
NodeId(1),
|
||||
Some(az_a_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
@@ -1063,28 +1146,196 @@ mod tests {
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-b" for the secondary location.
|
||||
// Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(1),
|
||||
Some(az_b_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Avoid nodes in "az-b" for the secondary location.
|
||||
// Node 3 has lower affinity score than 1, so prefer that.
|
||||
assert_scheduler_chooses::<SecondaryShardTag>(
|
||||
NodeId(3),
|
||||
Some(az_b_tag.clone()),
|
||||
&mut scheduled_intents,
|
||||
&mut scheduler,
|
||||
&mut context,
|
||||
);
|
||||
|
||||
for mut intent in scheduled_intents {
|
||||
intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
use test_log::test;
|
||||
|
||||
#[test]
|
||||
/// Make sure that when we have an odd number of nodes and an even number of shards, we still
|
||||
/// get scheduling stability.
|
||||
fn odd_nodes_stability() {
|
||||
let az_a = AvailabilityZone("az-a".to_string());
|
||||
let az_b = AvailabilityZone("az-b".to_string());
|
||||
|
||||
let nodes = test_utils::make_test_nodes(
|
||||
10,
|
||||
&[
|
||||
az_a.clone(),
|
||||
az_a.clone(),
|
||||
az_a.clone(),
|
||||
az_a.clone(),
|
||||
az_a.clone(),
|
||||
az_b.clone(),
|
||||
az_b.clone(),
|
||||
az_b.clone(),
|
||||
az_b.clone(),
|
||||
az_b.clone(),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Need to keep these alive because they contribute to shard counts via RAII
|
||||
let mut scheduled_shards = Vec::new();
|
||||
|
||||
let mut context = ScheduleContext::default();
|
||||
|
||||
fn schedule_shard(
|
||||
tenant_shard_id: TenantShardId,
|
||||
expect_attached: NodeId,
|
||||
expect_secondary: NodeId,
|
||||
scheduled_shards: &mut Vec<TenantShard>,
|
||||
scheduler: &mut Scheduler,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
context: &mut ScheduleContext,
|
||||
) {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
tenant_shard_id.shard_number,
|
||||
tenant_shard_id.shard_count,
|
||||
pageserver_api::shard::ShardStripeSize(1),
|
||||
)
|
||||
.unwrap();
|
||||
let mut shard = TenantShard::new(
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
pageserver_api::controller_api::PlacementPolicy::Attached(1),
|
||||
preferred_az,
|
||||
);
|
||||
|
||||
shard.schedule(scheduler, context).unwrap();
|
||||
|
||||
assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
|
||||
assert_eq!(
|
||||
shard.intent.get_secondary().first().unwrap(),
|
||||
&expect_secondary
|
||||
);
|
||||
|
||||
scheduled_shards.push(shard);
|
||||
}
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(0),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(1),
|
||||
NodeId(6),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(1),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(2),
|
||||
NodeId(7),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(2),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(3),
|
||||
NodeId(8),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(3),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(4),
|
||||
NodeId(9),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(4),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(5),
|
||||
NodeId(10),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(5),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(1),
|
||||
NodeId(6),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(6),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(2),
|
||||
NodeId(7),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
schedule_shard(
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(7),
|
||||
shard_count: ShardCount(8),
|
||||
},
|
||||
NodeId(3),
|
||||
NodeId(8),
|
||||
&mut scheduled_shards,
|
||||
&mut scheduler,
|
||||
Some(az_a.clone()),
|
||||
&mut context,
|
||||
);
|
||||
|
||||
// Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
|
||||
for shard in &scheduled_shards {
|
||||
assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
|
||||
}
|
||||
|
||||
for mut shard in scheduled_shards {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
ScheduleOptimization, ScheduleOptimizationAction,
|
||||
@@ -1576,6 +1576,7 @@ impl Service {
|
||||
attach_req.tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Attached(0),
|
||||
None,
|
||||
),
|
||||
);
|
||||
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
|
||||
@@ -2103,6 +2104,21 @@ impl Service {
|
||||
)
|
||||
};
|
||||
|
||||
let preferred_az_id: Option<AvailabilityZone> = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
// Idempotency: take the existing value if the tenant already exists
|
||||
if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) {
|
||||
shard.preferred_az().cloned()
|
||||
} else {
|
||||
locked
|
||||
.scheduler
|
||||
.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
|
||||
.ok()
|
||||
.and_then(|n_id| locked.scheduler.get_node_az(&n_id))
|
||||
}
|
||||
};
|
||||
|
||||
// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
|
||||
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
|
||||
// during the creation, rather than risking leaving orphan objects in S3.
|
||||
@@ -2122,7 +2138,7 @@ impl Service {
|
||||
splitting: SplitState::default(),
|
||||
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
|
||||
.unwrap(),
|
||||
preferred_az_id: None,
|
||||
preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()),
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -2158,6 +2174,7 @@ impl Service {
|
||||
&create_req.shard_parameters,
|
||||
create_req.config.clone(),
|
||||
placement_policy.clone(),
|
||||
preferred_az_id.as_ref(),
|
||||
&mut schedule_context,
|
||||
)
|
||||
.await;
|
||||
@@ -2171,44 +2188,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
let preferred_azs = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
response_shards
|
||||
.iter()
|
||||
.filter_map(|resp| {
|
||||
let az_id = locked
|
||||
.nodes
|
||||
.get(&resp.node_id)
|
||||
.map(|n| n.get_availability_zone_id().clone())?;
|
||||
|
||||
Some((resp.shard_id, az_id))
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
// Note that we persist the preferred AZ for the new shards separately.
|
||||
// In theory, we could "peek" the scheduler to determine where the shard will
|
||||
// land, but the subsequent "real" call into the scheduler might select a different
|
||||
// node. Hence, we do this awkward update to keep things consistent.
|
||||
let updated = self
|
||||
.persistence
|
||||
.set_tenant_shard_preferred_azs(preferred_azs)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to persist preferred az ids: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for (tid, az_id) in updated {
|
||||
if let Some(shard) = locked.tenants.get_mut(&tid) {
|
||||
shard.set_preferred_az(az_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we failed to schedule shards, then they are still created in the controller,
|
||||
// but we return an error to the requester to avoid a silent failure when someone
|
||||
// tries to e.g. create a tenant whose placement policy requires more nodes than
|
||||
@@ -2239,6 +2218,7 @@ impl Service {
|
||||
|
||||
/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
|
||||
/// case of a new tenant and a pre-existing one.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn do_initial_shard_scheduling(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -2246,6 +2226,7 @@ impl Service {
|
||||
shard_params: &ShardParameters,
|
||||
config: TenantConfig,
|
||||
placement_policy: PlacementPolicy,
|
||||
preferred_az_id: Option<&AvailabilityZone>,
|
||||
schedule_context: &mut ScheduleContext,
|
||||
) -> InitialShardScheduleOutcome {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
@@ -2283,6 +2264,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
|
||||
placement_policy,
|
||||
preferred_az_id.cloned(),
|
||||
));
|
||||
|
||||
state.generation = initial_generation;
|
||||
@@ -4149,16 +4131,14 @@ impl Service {
|
||||
},
|
||||
);
|
||||
|
||||
let mut child_state = TenantShard::new(child, child_shard, policy.clone());
|
||||
let mut child_state =
|
||||
TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone());
|
||||
child_state.intent = IntentState::single(scheduler, Some(pageserver));
|
||||
child_state.observed = ObservedState {
|
||||
locations: child_observed,
|
||||
};
|
||||
child_state.generation = Some(generation);
|
||||
child_state.config = config.clone();
|
||||
if let Some(preferred_az) = &preferred_az {
|
||||
child_state.set_preferred_az(preferred_az.clone());
|
||||
}
|
||||
|
||||
// The child's TenantShard::splitting is intentionally left at the default value of Idle,
|
||||
// as at this point in the split process we have succeeded and this part is infallible:
|
||||
@@ -5351,7 +5331,7 @@ impl Service {
|
||||
register_req.listen_http_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.availability_zone_id,
|
||||
register_req.availability_zone_id.clone(),
|
||||
);
|
||||
|
||||
// TODO: idempotency if the node already exists in the database
|
||||
@@ -5371,8 +5351,9 @@ impl Service {
|
||||
.set(locked.nodes.len() as i64);
|
||||
|
||||
tracing::info!(
|
||||
"Registered pageserver {}, now have {} pageservers",
|
||||
"Registered pageserver {} ({}), now have {} pageservers",
|
||||
register_req.node_id,
|
||||
register_req.availability_zone_id,
|
||||
locked.nodes.len()
|
||||
);
|
||||
Ok(())
|
||||
@@ -6109,11 +6090,17 @@ impl Service {
|
||||
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
|
||||
// this higher than the execution limit gives us a chance to execute some work even if the first
|
||||
// few optimizations we find are not ready.
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 2;
|
||||
|
||||
let mut work = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// We are going to plan a bunch of optimisations before applying any of them, so the
|
||||
// utilisation stats on nodes will be effectively stale for the >1st optimisation we
|
||||
// generate. To avoid this causing unstable migrations/flapping, it's important that the
|
||||
// code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`]
|
||||
// to ignore the utilisation component of the score.
|
||||
|
||||
for (_tenant_id, schedule_context, shards) in
|
||||
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
|
||||
@@ -6149,7 +6136,7 @@ impl Service {
|
||||
if let Some(optimization) =
|
||||
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
|
||||
// its primary location based on soft constraints, cut it over.
|
||||
shard.optimize_attachment(nodes, &schedule_context)
|
||||
shard.optimize_attachment(scheduler, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
@@ -6208,8 +6195,10 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
ScheduleOptimizationAction::ReplaceSecondary(_) => {
|
||||
// No extra checks needed to replace a secondary: this does not interrupt client access
|
||||
ScheduleOptimizationAction::ReplaceSecondary(_)
|
||||
| ScheduleOptimizationAction::CreateSecondary(_)
|
||||
| ScheduleOptimizationAction::RemoveSecondary(_) => {
|
||||
// No extra checks needed to manage secondaries: this does not interrupt client access
|
||||
validated_work.push((tenant_shard_id, optimization))
|
||||
}
|
||||
};
|
||||
@@ -6281,26 +6270,35 @@ impl Service {
|
||||
/// we have this helper to move things along faster.
|
||||
#[cfg(feature = "testing")]
|
||||
async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) {
|
||||
let (attached_node, secondary_node) = {
|
||||
let (attached_node, secondaries) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
|
||||
tracing::warn!(
|
||||
"Skipping kick of secondary download for {tenant_shard_id}: not found"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let (Some(attached), Some(secondary)) = (
|
||||
shard.intent.get_attached(),
|
||||
shard.intent.get_secondary().first(),
|
||||
) else {
|
||||
|
||||
let Some(attached) = shard.intent.get_attached() else {
|
||||
tracing::warn!(
|
||||
"Skipping kick of secondary download for {tenant_shard_id}: no attached"
|
||||
);
|
||||
return;
|
||||
};
|
||||
(
|
||||
locked.nodes.get(attached).unwrap().clone(),
|
||||
locked.nodes.get(secondary).unwrap().clone(),
|
||||
)
|
||||
|
||||
let secondaries = shard
|
||||
.intent
|
||||
.get_secondary()
|
||||
.iter()
|
||||
.map(|n| locked.nodes.get(n).unwrap().clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
(locked.nodes.get(attached).unwrap().clone(), secondaries)
|
||||
};
|
||||
|
||||
// Make remote API calls to upload + download heatmaps: we ignore errors because this is just
|
||||
// a 'kick' to let scheduling optimisation run more promptly.
|
||||
attached_node
|
||||
match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
|
||||
&self.config.jwt_token,
|
||||
@@ -6309,22 +6307,57 @@ impl Service {
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
Some(Err(e)) => {
|
||||
tracing::info!(
|
||||
"Failed to upload heatmap from {attached_node} for {tenant_shard_id}: {e}"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
tracing::info!(
|
||||
"Cancelled while uploading heatmap from {attached_node} for {tenant_shard_id}"
|
||||
);
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
tracing::info!(
|
||||
"Successfully uploaded heatmap from {attached_node} for {tenant_shard_id}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
secondary_node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
client
|
||||
.tenant_secondary_download(tenant_shard_id, Some(Duration::from_secs(1)))
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
for secondary_node in secondaries {
|
||||
match secondary_node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
client
|
||||
.tenant_secondary_download(
|
||||
tenant_shard_id,
|
||||
Some(Duration::from_secs(1)),
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(Err(e)) => {
|
||||
tracing::info!(
|
||||
"Failed to download heatmap from {secondary_node} for {tenant_shard_id}: {e}"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
tracing::info!("Cancelled while downloading heatmap from {secondary_node} for {tenant_shard_id}");
|
||||
}
|
||||
Some(Ok(progress)) => {
|
||||
tracing::info!("Successfully downloaded heatmap from {secondary_node} for {tenant_shard_id}: {progress:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Look for shards which are oversized and in need of splitting
|
||||
@@ -6760,9 +6793,15 @@ impl Service {
|
||||
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
|
||||
let mut tids_by_node = locked
|
||||
.tenants
|
||||
let node_az = nodes
|
||||
.get(&node_id)
|
||||
.expect("Node must exist")
|
||||
.get_availability_zone_id()
|
||||
.clone();
|
||||
|
||||
let mut tids_by_node = tenants
|
||||
.iter_mut()
|
||||
.filter_map(|(tid, tenant_shard)| {
|
||||
if !matches!(
|
||||
@@ -6775,6 +6814,25 @@ impl Service {
|
||||
return None;
|
||||
}
|
||||
|
||||
// AZ check: when filling nodes after a restart, our intent is to move _back_ the
|
||||
// shards which belong on this node, not to promote shards whose scheduling preference
|
||||
// would be on their currently attached node. So will avoid promoting shards whose
|
||||
// home AZ doesn't match the AZ of the node we're filling.
|
||||
match tenant_shard.preferred_az() {
|
||||
None => {
|
||||
// Shard doesn't have an AZ preference: it is elegible to be moved.
|
||||
}
|
||||
Some(az) if az == &node_az => {
|
||||
// This shard's home AZ is equal to the node we're filling: it is
|
||||
// elegible to be moved: fall through;
|
||||
}
|
||||
Some(_) => {
|
||||
// This shard's home AZ is somewhere other than the node we're filling:
|
||||
// do not include it in the fill plan.
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
if tenant_shard.intent.get_secondary().contains(&node_id) {
|
||||
if let Some(primary) = tenant_shard.intent.get_attached() {
|
||||
return Some((*primary, *tid));
|
||||
|
||||
@@ -11,16 +11,14 @@ use crate::{
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::{ReconcileUnits, ReconcilerConfig},
|
||||
scheduler::{
|
||||
AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
|
||||
SecondaryShardTag,
|
||||
AttachedShardTag, NodeAttachmentSchedulingScore, NodeSchedulingScore,
|
||||
NodeSecondarySchedulingScore, RefCountUpdate, ScheduleContext, SecondaryShardTag,
|
||||
},
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use futures::future::{self, Either};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -315,6 +313,7 @@ pub(crate) struct ObservedStateLocation {
|
||||
/// we know that we might have some state on this node.
|
||||
pub(crate) conf: Option<LocationConfig>,
|
||||
}
|
||||
|
||||
pub(crate) struct ReconcilerWaiter {
|
||||
// For observability purposes, remember the ID of the shard we're
|
||||
// waiting for.
|
||||
@@ -360,6 +359,10 @@ pub(crate) enum ScheduleOptimizationAction {
|
||||
ReplaceSecondary(ReplaceSecondary),
|
||||
// Migrate attachment to an existing secondary location
|
||||
MigrateAttachment(MigrateAttachment),
|
||||
// Create a secondary location, with the intent of later migrating to it
|
||||
CreateSecondary(NodeId),
|
||||
// Remove a secondary location that we previously created to facilitate a migration
|
||||
RemoveSecondary(NodeId),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone)]
|
||||
@@ -472,6 +475,7 @@ impl TenantShard {
|
||||
tenant_shard_id: TenantShardId,
|
||||
shard: ShardIdentity,
|
||||
policy: PlacementPolicy,
|
||||
preferred_az_id: Option<AvailabilityZone>,
|
||||
) -> Self {
|
||||
metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
@@ -495,7 +499,7 @@ impl TenantShard {
|
||||
last_error: Arc::default(),
|
||||
pending_compute_notification: false,
|
||||
scheduling_policy: ShardSchedulingPolicy::default(),
|
||||
preferred_az_id: None,
|
||||
preferred_az_id,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -626,24 +630,7 @@ impl TenantShard {
|
||||
use PlacementPolicy::*;
|
||||
match self.policy {
|
||||
Attached(secondary_count) => {
|
||||
let retain_secondaries = if self.intent.attached.is_none()
|
||||
&& scheduler.node_preferred(&self.intent.secondary).is_some()
|
||||
{
|
||||
// If we have no attached, and one of the secondaries is elegible to be promoted, retain
|
||||
// one more secondary than we usually would, as one of them will become attached futher down this function.
|
||||
secondary_count + 1
|
||||
} else {
|
||||
secondary_count
|
||||
};
|
||||
|
||||
while self.intent.secondary.len() > retain_secondaries {
|
||||
// We have no particular preference for one secondary location over another: just
|
||||
// arbitrarily drop from the end
|
||||
self.intent.pop_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
// Should have exactly one attached, and N secondaries
|
||||
// Should have exactly one attached, and at least N secondaries
|
||||
let (modified_attached, attached_node_id) =
|
||||
self.schedule_attached(scheduler, context)?;
|
||||
modified |= modified_attached;
|
||||
@@ -740,90 +727,258 @@ impl TenantShard {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
|
||||
fn is_better_location(
|
||||
&self,
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
current: NodeId,
|
||||
candidate: NodeId,
|
||||
) -> Option<bool> {
|
||||
let Some(candidate_score) = scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
|
||||
candidate,
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
) else {
|
||||
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
|
||||
return None;
|
||||
};
|
||||
|
||||
match scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
|
||||
current,
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
) {
|
||||
Some(current_score) => {
|
||||
// Ignore utilization components when comparing scores: we don't want to migrate
|
||||
// because of transient load variations, it risks making the system thrash, and
|
||||
// migrating for utilization requires a separate high level view of the system to
|
||||
// e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
|
||||
// moving things around in the order that we hit this function.
|
||||
let candidate_score = candidate_score.for_optimization();
|
||||
let current_score = current_score.for_optimization();
|
||||
|
||||
if candidate_score < current_score {
|
||||
tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})");
|
||||
Some(true)
|
||||
} else {
|
||||
// The candidate node is no better than our current location, so don't migrate
|
||||
tracing::debug!(
|
||||
"Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
|
||||
);
|
||||
Some(false)
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// The current node is unavailable for scheduling, so we can't make any sensible
|
||||
// decisions about optimisation. This should be a transient state -- if the node
|
||||
// is offline then it will get evacuated, if is blocked by a scheduling mode
|
||||
// then we will respect that mode by doing nothing.
|
||||
tracing::debug!("Current node {current} is unavailable for scheduling");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn find_better_location(
|
||||
&self,
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<NodeId> {
|
||||
// TODO: fast path: if the attached node is already in the preferred AZ, _and_ has no
|
||||
// other shards from the same tenant on it, then skip doing any scheduling calculations.
|
||||
|
||||
let attached = (*self.intent.get_attached())?;
|
||||
tracing::info!(
|
||||
"Initially: attached {attached} ({:?} vs {:?}), in context {schedule_context:?}",
|
||||
scheduler.get_node_az(&attached),
|
||||
self.preferred_az_id.as_ref()
|
||||
);
|
||||
|
||||
// Construct a schedule context that excludes locations belonging to
|
||||
// this shard: this simulates removing and re-scheduling the shard
|
||||
// let schedule_context = schedule_context.project_detach(self);
|
||||
|
||||
// Look for a lower-scoring location to attach to
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<AttachedShardTag>(
|
||||
&[], // Don't hard-exclude anything: we want to consider the possibility of migrating to somewhere we already have a secondary
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
tracing::debug!("No candidate node found");
|
||||
return None;
|
||||
};
|
||||
|
||||
if candidate_node == attached {
|
||||
// We're already at the best possible location, so don't migrate
|
||||
tracing::debug!("Candidate node {candidate_node} is already attached");
|
||||
return None;
|
||||
}
|
||||
|
||||
// Check if our candidate node is in the preferred AZ: it might not be, if the scheduler
|
||||
// is trying its best to handle an overloaded AZ.
|
||||
if self.preferred_az_id.is_some()
|
||||
&& scheduler.get_node_az(&candidate_node) != self.preferred_az_id
|
||||
{
|
||||
tracing::debug!(
|
||||
"Candidate node {candidate_node} is not in preferred AZ {:?}",
|
||||
self.preferred_az_id
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
self.is_better_location(scheduler, schedule_context, attached, candidate_node)
|
||||
.and_then(|better| if better { Some(candidate_node) } else { None })
|
||||
}
|
||||
|
||||
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
||||
/// its primary location based on soft constraints, switch that secondary location
|
||||
/// to be attached.
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn optimize_attachment(
|
||||
&self,
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<ScheduleOptimization> {
|
||||
let attached = (*self.intent.get_attached())?;
|
||||
if self.intent.secondary.is_empty() {
|
||||
// We can only do useful work if we have both attached and secondary locations: this
|
||||
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
||||
return None;
|
||||
}
|
||||
|
||||
let current_affinity_score = schedule_context.get_node_affinity(attached);
|
||||
let current_attachment_count = schedule_context.get_node_attachments(attached);
|
||||
let schedule_context = schedule_context.project_detach(self);
|
||||
|
||||
// Generate score for each node, dropping any un-schedulable nodes.
|
||||
let all_pageservers = self.intent.all_pageservers();
|
||||
let mut scores = all_pageservers
|
||||
.iter()
|
||||
.flat_map(|node_id| {
|
||||
let node = nodes.get(node_id);
|
||||
if node.is_none() {
|
||||
None
|
||||
} else if matches!(
|
||||
node.unwrap().get_scheduling(),
|
||||
NodeSchedulingPolicy::Filling
|
||||
) {
|
||||
// If the node is currently filling, don't count it as a candidate to avoid,
|
||||
// racing with the background fill.
|
||||
None
|
||||
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
|
||||
None
|
||||
} else {
|
||||
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
||||
let attachment_count = schedule_context.get_node_attachments(*node_id);
|
||||
Some((*node_id, affinity_score, attachment_count))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort precedence:
|
||||
// 1st - prefer nodes with the lowest total affinity score
|
||||
// 2nd - prefer nodes with the lowest number of attachments in this context
|
||||
// 3rd - if all else is equal, sort by node ID for determinism in tests.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
||||
|
||||
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
|
||||
scores.first()
|
||||
{
|
||||
if attached != *preferred_node {
|
||||
// The best alternative must be more than 1 better than us, otherwise we could end
|
||||
// up flapping back next time we're called (e.g. there's no point migrating from
|
||||
// a location with score 1 to a score zero, because on next location the situation
|
||||
// would be the same, but in reverse).
|
||||
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|
||||
|| current_attachment_count > *preferred_attachment_count + 1
|
||||
{
|
||||
tracing::info!(
|
||||
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: attached,
|
||||
new_attached_node_id: *preferred_node,
|
||||
}),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Node {} is already preferred (score {:?})",
|
||||
preferred_node,
|
||||
preferred_affinity_score
|
||||
);
|
||||
// If we already have a secondary that is higher-scoring than out current location,
|
||||
// then simply migrate to it.
|
||||
for secondary in self.intent.get_secondary() {
|
||||
if let Some(true) =
|
||||
self.is_better_location(scheduler, &schedule_context, attached, *secondary)
|
||||
{
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: attached,
|
||||
new_attached_node_id: *secondary,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Fall-through: we didn't find an optimization
|
||||
None
|
||||
// Given that none of our current secondaries is a better location than our current
|
||||
// attached location (checked above), we may trim any secondaries that are not needed
|
||||
// for the placement policy.
|
||||
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
||||
// This code path cleans up extra secondaries after migrating, and/or
|
||||
// trims extra secondaries after a PlacementPolicy::Attached(N) was
|
||||
// modified to decrease N.
|
||||
|
||||
let mut secondary_scores = self
|
||||
.intent
|
||||
.get_secondary()
|
||||
.iter()
|
||||
.map(|node_id| {
|
||||
(
|
||||
*node_id,
|
||||
scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
|
||||
*node_id,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if secondary_scores.iter().any(|score| score.1.is_none()) {
|
||||
// Don't have full list of scores, so can't make a good decision about which to drop unless
|
||||
// there is an obvious one in the wrong AZ
|
||||
for secondary in self.intent.get_secondary() {
|
||||
if scheduler.get_node_az(secondary) == self.preferred_az_id {
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through: we didn't identify one to remove. This ought to be rare.
|
||||
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
} else {
|
||||
secondary_scores.sort_by_key(|score| score.1.unwrap());
|
||||
let victim = secondary_scores.last().unwrap().0;
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(victim),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let replacement = self.find_better_location(scheduler, &schedule_context);
|
||||
|
||||
// We have found a candidate and confirmed that its score is preferable
|
||||
// to our current location. See if we have a secondary location in the preferred location already: if not,
|
||||
// then create one.
|
||||
if let Some(replacement) = replacement {
|
||||
if !self.intent.get_secondary().contains(&replacement) {
|
||||
tracing::info!(
|
||||
"Identified optimization({}): create secondary {replacement}",
|
||||
self.tenant_shard_id
|
||||
);
|
||||
Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::CreateSecondary(replacement),
|
||||
})
|
||||
} else {
|
||||
// We already have a secondary in the preferred location, let's try migrating to it. Our caller
|
||||
// will check the warmth of the destination before deciding whether to really execute this.
|
||||
tracing::info!(
|
||||
"Identified optimization({}): migrate attachment {attached}->{replacement}",
|
||||
self.tenant_shard_id
|
||||
);
|
||||
Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: attached,
|
||||
new_attached_node_id: replacement,
|
||||
}),
|
||||
})
|
||||
}
|
||||
// } else if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
||||
// // We aren't in the process of migrating anywhere, and we're attached in our preferred AZ. If there are
|
||||
// // any other secondary locations in our preferred AZ, we presume they were created to facilitate a migration
|
||||
// // of the attached location, and remove them.
|
||||
// for secondary in self.intent.get_secondary() {
|
||||
// if scheduler.get_node_az(secondary) == self.preferred_az_id {
|
||||
// tracing::info!(
|
||||
// "Identified optimization({}): remove secondary {secondary}",
|
||||
// self.tenant_shard_id
|
||||
// );
|
||||
// return Some(ScheduleOptimization {
|
||||
// sequence: self.sequence,
|
||||
// action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Fall through: maybe we had excess secondaries in other AZs? Trim them in an arbitrary order
|
||||
// // (lowest Node ID first).
|
||||
// let mut secondary_node_ids = self.intent.get_secondary().clone();
|
||||
// secondary_node_ids.sort();
|
||||
// let victim = secondary_node_ids
|
||||
// .first()
|
||||
// .expect("Within block for > check on secondary count");
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: self.sequence,
|
||||
// action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
||||
// })
|
||||
} else {
|
||||
// We didn't find somewhere we'd rather be, and we don't have any excess secondaries
|
||||
// to clean up: no action required.
|
||||
None
|
||||
}
|
||||
|
||||
// TODO: if we find that our current location is optimal, _and_ we also have a secondary
|
||||
// in the preferred AZ, then clean up that secondary: it was only created to act as a
|
||||
// migration destination.
|
||||
// ...maybe do this in optimize_secondary()?
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
@@ -832,18 +987,18 @@ impl TenantShard {
|
||||
scheduler: &mut Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<ScheduleOptimization> {
|
||||
if self.intent.secondary.is_empty() {
|
||||
// We can only do useful work if we have both attached and secondary locations: this
|
||||
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
||||
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
||||
// We have extra secondaries, perhaps to facilitate a migration of the attached location:
|
||||
// do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
|
||||
// and we are called again, we will proceed.
|
||||
tracing::debug!("Too many secondaries: skipping");
|
||||
return None;
|
||||
}
|
||||
|
||||
for secondary in self.intent.get_secondary() {
|
||||
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
|
||||
// We're already on a node unaffected any affinity constraints,
|
||||
// so we won't change it.
|
||||
continue;
|
||||
};
|
||||
let schedule_context = schedule_context.project_secondary_detach(*secondary);
|
||||
// TODO: fast path to avoid full scheduling calculation if we're in the right AZ and not
|
||||
// sharing with any other shards in the same tenant
|
||||
|
||||
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
|
||||
// This implicitly limits the choice to nodes that are available, and prefers nodes
|
||||
@@ -851,33 +1006,63 @@ impl TenantShard {
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&self.intent.all_pageservers(),
|
||||
&self.preferred_az_id,
|
||||
schedule_context,
|
||||
&schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
continue;
|
||||
};
|
||||
|
||||
let candidate_affinity_score = schedule_context
|
||||
.nodes
|
||||
.get(&candidate_node)
|
||||
.unwrap_or(&AffinityScore::FREE);
|
||||
let Some(candidate_score) = scheduler
|
||||
.compute_node_score::<NodeSecondarySchedulingScore>(
|
||||
candidate_node,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
)
|
||||
else {
|
||||
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
|
||||
// This is unexpected, because schedule() yielded this node
|
||||
debug_assert!(false);
|
||||
continue;
|
||||
};
|
||||
|
||||
// The best alternative must be more than 1 better than us, otherwise we could end
|
||||
// up flapping back next time we're called.
|
||||
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
|
||||
// If some other node is available and has a lower score than this node, then
|
||||
// that other node is a good place to migrate to.
|
||||
tracing::info!(
|
||||
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
||||
old_node_id: *secondary,
|
||||
new_node_id: candidate_node,
|
||||
}),
|
||||
});
|
||||
match scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
|
||||
*secondary,
|
||||
&self.preferred_az_id,
|
||||
&schedule_context,
|
||||
) {
|
||||
Some(current_score) => {
|
||||
// Disregard utilization: we don't want to thrash around based on disk utilization
|
||||
let current_score = current_score.for_optimization();
|
||||
let candidate_score = candidate_score.for_optimization();
|
||||
|
||||
if candidate_score < current_score {
|
||||
tracing::info!(
|
||||
"Identified optimization({}, home AZ {:?}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ",
|
||||
self.tenant_shard_id,
|
||||
self.preferred_az_id,
|
||||
self.intent.get_secondary(),
|
||||
candidate_score,
|
||||
current_score
|
||||
);
|
||||
return Some(ScheduleOptimization {
|
||||
sequence: self.sequence,
|
||||
action: ScheduleOptimizationAction::ReplaceSecondary(
|
||||
ReplaceSecondary {
|
||||
old_node_id: *secondary,
|
||||
new_node_id: candidate_node,
|
||||
},
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// The current node is unavailable for scheduling, so we can't make any sensible
|
||||
// decisions about optimisation. This should be a transient state -- if the node
|
||||
// is offline then it will get evacuated, if is blocked by a scheduling mode
|
||||
// then we will respect that mode by doing nothing.
|
||||
tracing::debug!("Current node {secondary} is unavailable for scheduling");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -916,6 +1101,12 @@ impl TenantShard {
|
||||
self.intent.remove_secondary(scheduler, old_node_id);
|
||||
self.intent.push_secondary(scheduler, new_node_id);
|
||||
}
|
||||
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
|
||||
self.intent.push_secondary(scheduler, new_node_id);
|
||||
}
|
||||
ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
|
||||
self.intent.remove_secondary(scheduler, old_secondary);
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
@@ -1571,6 +1762,7 @@ pub(crate) mod tests {
|
||||
)
|
||||
.unwrap(),
|
||||
policy,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1606,6 +1798,7 @@ pub(crate) mod tests {
|
||||
)
|
||||
.unwrap(),
|
||||
policy.clone(),
|
||||
None,
|
||||
);
|
||||
|
||||
if let Some(az) = &preferred_az {
|
||||
@@ -1749,65 +1942,92 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn optimize_attachment() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
/// Simple case: moving attachment to somewhere better where we already have a secondary
|
||||
fn optimize_attachment_simple() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(
|
||||
3,
|
||||
&[
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
||||
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
||||
|
||||
// Initially: both nodes attached on shard 1, and both have secondary locations
|
||||
// on different nodes.
|
||||
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
|
||||
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
|
||||
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
schedule_context
|
||||
}
|
||||
|
||||
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
|
||||
|
||||
// Either shard should recognize that it has the option to switch to a secondary location where there
|
||||
// would be no other shards from the same tenant, and request to do so.
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(1),
|
||||
new_attached_node_id: NodeId(2)
|
||||
old_attached_node_id: NodeId(2),
|
||||
new_attached_node_id: NodeId(1)
|
||||
})
|
||||
})
|
||||
);
|
||||
|
||||
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
|
||||
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
|
||||
// of [`Service::optimize_all`] to avoid trying
|
||||
// to do optimizations for multiple shards in the same tenant at the same time. Generating
|
||||
// both optimizations is just done for test purposes
|
||||
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_b,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_b.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(1),
|
||||
new_attached_node_id: NodeId(3)
|
||||
})
|
||||
})
|
||||
);
|
||||
|
||||
// Applying these optimizations should result in the end state proposed
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
|
||||
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
|
||||
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
|
||||
// // Either shard should recognize that it has the option to switch to a secondary location where there
|
||||
// // would be no other shards from the same tenant, and request to do so.
|
||||
// assert_eq!(
|
||||
// optimization_a_prepare,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
||||
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
// assert_eq!(
|
||||
// optimization_a_migrate,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
// old_attached_node_id: NodeId(1),
|
||||
// new_attached_node_id: NodeId(2)
|
||||
// })
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
||||
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
// assert_eq!(
|
||||
// optimization_a_cleanup,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
||||
|
||||
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
shard_b.intent.clear(&mut scheduler);
|
||||
@@ -1815,6 +2035,168 @@ pub(crate) mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Complicated case: moving attachment to somewhere better where we do not have a secondary
|
||||
/// already, creating one as needed.
|
||||
fn optimize_attachment_multistep() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(
|
||||
3,
|
||||
&[
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
],
|
||||
);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Two shards of a tenant that wants to be in AZ A
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
||||
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
||||
|
||||
// Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
|
||||
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
|
||||
|
||||
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
schedule_context
|
||||
}
|
||||
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_prepare,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
||||
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_migrate,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(2),
|
||||
new_attached_node_id: NodeId(1)
|
||||
})
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
||||
|
||||
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_cleanup,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shard_a.sequence,
|
||||
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
|
||||
})
|
||||
);
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
||||
|
||||
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
shard_b.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Check that multi-step migration works when moving to somewhere that is only better by
|
||||
/// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
|
||||
/// counting toward the affinity score such that it prevents the rest of the migration from happening.
|
||||
fn optimize_attachment_marginal() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(2, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
// Multi-sharded tenant, we will craft a situation where affinity
|
||||
// scores differ only slightly
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
|
||||
// 1 attached on node 1
|
||||
shards[0]
|
||||
.intent
|
||||
.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
// 2 attached and one secondary on node 2
|
||||
shards[1]
|
||||
.intent
|
||||
.set_attached(&mut scheduler, Some(NodeId(2)));
|
||||
shards[2]
|
||||
.intent
|
||||
.set_attached(&mut scheduler, Some(NodeId(2)));
|
||||
shards[1].intent.push_secondary(&mut scheduler, NodeId(2));
|
||||
|
||||
fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for shard in shards {
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
if let Some(attached) = shard.intent.get_attached() {
|
||||
schedule_context.push_attached(*attached);
|
||||
}
|
||||
}
|
||||
schedule_context
|
||||
}
|
||||
|
||||
let schedule_context = make_schedule_context(&shards);
|
||||
let optimization_a_prepare =
|
||||
shards[2].optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_prepare,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shards[2].sequence,
|
||||
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
|
||||
})
|
||||
);
|
||||
shards[2].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
||||
|
||||
let schedule_context = make_schedule_context(&shards);
|
||||
let optimization_a_migrate =
|
||||
shards[2].optimize_attachment(&mut scheduler, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_a_migrate,
|
||||
Some(ScheduleOptimization {
|
||||
sequence: shards[2].sequence,
|
||||
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(2),
|
||||
new_attached_node_id: NodeId(1)
|
||||
})
|
||||
})
|
||||
);
|
||||
shards[2].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
||||
|
||||
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
||||
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
||||
// assert_eq!(
|
||||
// optimization_a_cleanup,
|
||||
// Some(ScheduleOptimization {
|
||||
// sequence: shard_a.sequence,
|
||||
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
|
||||
// })
|
||||
// );
|
||||
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
||||
|
||||
for mut shard in shards {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn optimize_secondary() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
@@ -1865,7 +2247,6 @@ pub(crate) mod tests {
|
||||
// called repeatedly in the background.
|
||||
// Returns the applied optimizations
|
||||
fn optimize_til_idle(
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
shards: &mut [TenantShard],
|
||||
) -> Vec<ScheduleOptimization> {
|
||||
@@ -1883,7 +2264,12 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
let optimization = shard.optimize_attachment(nodes, &schedule_context);
|
||||
let optimization = shard.optimize_attachment(scheduler, &schedule_context);
|
||||
tracing::info!(
|
||||
"optimize_attachment({})={:?}",
|
||||
shard.tenant_shard_id,
|
||||
optimization
|
||||
);
|
||||
if let Some(optimization) = optimization {
|
||||
optimizations.push(optimization.clone());
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
@@ -1892,6 +2278,11 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
|
||||
tracing::info!(
|
||||
"optimize_secondary({})={:?}",
|
||||
shard.tenant_shard_id,
|
||||
optimization
|
||||
);
|
||||
if let Some(optimization) = optimization {
|
||||
optimizations.push(optimization.clone());
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
@@ -1912,18 +2303,40 @@ pub(crate) mod tests {
|
||||
optimizations
|
||||
}
|
||||
|
||||
use test_log::test;
|
||||
|
||||
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
|
||||
/// that it converges.
|
||||
#[test]
|
||||
fn optimize_add_nodes() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4, &[]);
|
||||
let nodes = make_test_nodes(
|
||||
9,
|
||||
&[
|
||||
// Initial 6 nodes
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
// Three we will add later
|
||||
AvailabilityZone("az-a".to_string()),
|
||||
AvailabilityZone("az-b".to_string()),
|
||||
AvailabilityZone("az-c".to_string()),
|
||||
],
|
||||
);
|
||||
|
||||
// Only show the scheduler a couple of nodes
|
||||
// Only show the scheduler two nodes in each AZ to start with
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
for i in 1..=6 {
|
||||
scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
|
||||
}
|
||||
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
||||
let mut shards = make_test_tenant(
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount::new(4),
|
||||
Some(AvailabilityZone("az-a".to_string())),
|
||||
);
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for shard in &mut shards {
|
||||
assert!(shard
|
||||
@@ -1931,30 +2344,50 @@ pub(crate) mod tests {
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
// We should see equal number of locations on the two nodes.
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
|
||||
// Initial: attached locations land in the tenant's home AZ.
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
||||
|
||||
// Add another two nodes: we should see the shards spread out when their optimize
|
||||
// methods are called
|
||||
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
||||
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
|
||||
// Initial: secondary locations in a remote AZ
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
||||
// Add another three nodes: we should see the shards spread out when their optimize
|
||||
// methods are called
|
||||
scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
|
||||
optimize_til_idle(&mut scheduler, &mut shards);
|
||||
|
||||
// We expect one attached location was moved to the new node in the tenant's home AZ
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
|
||||
// The original node has one less attached shard
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
||||
|
||||
// One of the original nodes still has two attachments, since there are an odd number of nodes
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
|
||||
// None of our secondaries moved, since we already had enough nodes for those to be
|
||||
// scheduled perfectly
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
@@ -1994,10 +2427,10 @@ pub(crate) mod tests {
|
||||
shard.schedule(&mut scheduler, context).unwrap();
|
||||
}
|
||||
|
||||
let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
|
||||
let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
|
||||
assert_eq!(applied_to_a, vec![]);
|
||||
|
||||
let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
|
||||
let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
|
||||
assert_eq!(applied_to_b, vec![]);
|
||||
|
||||
for shard in a.iter_mut().chain(b.iter_mut()) {
|
||||
|
||||
@@ -86,7 +86,7 @@ def test_storage_controller_many_tenants(
|
||||
|
||||
AZS = ["alpha", "bravo", "charlie"]
|
||||
neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update(
|
||||
{"availability_zone": f"az-{AZS[ps_cfg['id'] % len(AZS)]}"}
|
||||
{"availability_zone": f"az-{AZS[(ps_cfg['id'] - 1) % len(AZS)]}"}
|
||||
)
|
||||
|
||||
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
|
||||
@@ -114,8 +114,8 @@ def test_storage_controller_many_tenants(
|
||||
ps.allowed_errors.append(".*request was dropped before completing.*")
|
||||
|
||||
# Total tenants
|
||||
small_tenant_count = 7800
|
||||
large_tenant_count = 200
|
||||
small_tenant_count = 780
|
||||
large_tenant_count = 20
|
||||
tenant_count = small_tenant_count + large_tenant_count
|
||||
large_tenant_shard_count = 8
|
||||
total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count
|
||||
@@ -141,7 +141,7 @@ def test_storage_controller_many_tenants(
|
||||
# We will create timelines in only a subset of tenants, because creating timelines
|
||||
# does many megabytes of IO, and we want to densely simulate huge tenant counts on
|
||||
# a single test node.
|
||||
tenant_timelines_count = 100
|
||||
tenant_timelines_count = 10
|
||||
|
||||
# These lists are maintained for use with rng.choice
|
||||
tenants_with_timelines = list(rng.sample(list(tenants.keys()), tenant_timelines_count))
|
||||
@@ -380,7 +380,7 @@ def test_storage_controller_many_tenants(
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts before rolling restart: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
# assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
|
||||
# and the storage controller drain and fill API
|
||||
@@ -445,7 +445,7 @@ def test_storage_controller_many_tenants(
|
||||
shard_counts = get_consistent_node_shard_counts(env, total_shards)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
|
||||
assert_consistent_balanced_attachments(env, total_shards)
|
||||
# assert_consistent_balanced_attachments(env, total_shards)
|
||||
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
@@ -516,14 +516,18 @@ def test_sharding_split_smoke(
|
||||
shard_count = 2
|
||||
# Shard count we split into
|
||||
split_shard_count = 4
|
||||
# We will have 2 shards per pageserver once done (including secondaries)
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
# In preferred AZ & other AZ we will end up with one shard per pageserver
|
||||
neon_env_builder.num_pageservers = split_shard_count * 2
|
||||
|
||||
# Two AZs
|
||||
def assign_az(ps_cfg):
|
||||
az = f"az-{(ps_cfg['id'] - 1) % 2}"
|
||||
ps_cfg["availability_zone"] = az
|
||||
|
||||
# We will run more pageservers than tests usually do, so give them tiny page caches
|
||||
# in case we're on a test node under memory pressure.
|
||||
ps_cfg["page_cache_size"] = 128
|
||||
|
||||
neon_env_builder.pageserver_config_override = assign_az
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
@@ -659,8 +663,8 @@ def test_sharding_split_smoke(
|
||||
# - shard_count reconciles for the original setup of the tenant
|
||||
# - shard_count reconciles for detaching the original secondary locations during split
|
||||
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
|
||||
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
|
||||
# - split_shard_count/2 reconciles to migrate shards to their temporary secondaries
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + 3 * (split_shard_count / 2)
|
||||
|
||||
reconcile_ok = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
@@ -725,10 +729,14 @@ def test_sharding_split_smoke(
|
||||
# dominated by shard count.
|
||||
log.info(f"total: {total}")
|
||||
assert total == {
|
||||
1: 2,
|
||||
2: 2,
|
||||
3: 2,
|
||||
4: 2,
|
||||
1: 1,
|
||||
2: 1,
|
||||
3: 1,
|
||||
4: 1,
|
||||
5: 1,
|
||||
6: 1,
|
||||
7: 1,
|
||||
8: 1,
|
||||
}
|
||||
|
||||
# The controller is not required to lay out the attached locations in any particular way, but
|
||||
|
||||
@@ -3011,11 +3011,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
|
||||
def assign_az(ps_cfg):
|
||||
az = f"az-{ps_cfg['id']}"
|
||||
az = f"az-{ps_cfg['id'] % 2}"
|
||||
log.info("Assigned AZ {az}")
|
||||
ps_cfg["availability_zone"] = az
|
||||
|
||||
neon_env_builder.pageserver_config_override = assign_az
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.num_pageservers = 4
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
@@ -3030,8 +3031,14 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
assert shards[0]["preferred_az_id"] == expected_az
|
||||
|
||||
# When all other schedule scoring parameters are equal, tenants should round-robin on AZs
|
||||
assert env.storage_controller.tenant_describe(tids[0])["shards"][0]["preferred_az_id"] == "az-1"
|
||||
assert env.storage_controller.tenant_describe(tids[1])["shards"][0]["preferred_az_id"] == "az-0"
|
||||
assert env.storage_controller.tenant_describe(tids[2])["shards"][0]["preferred_az_id"] == "az-1"
|
||||
|
||||
# Try modifying preferred AZ
|
||||
updated = env.storage_controller.set_preferred_azs(
|
||||
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
|
||||
{TenantShardId(tid, 0, 0): "az-0" for tid in tids}
|
||||
)
|
||||
|
||||
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
|
||||
@@ -3039,29 +3046,24 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
|
||||
for tid in tids:
|
||||
shards = env.storage_controller.tenant_describe(tid)["shards"]
|
||||
assert len(shards) == 1
|
||||
assert shards[0]["preferred_az_id"] == "foo"
|
||||
assert shards[0]["preferred_az_id"] == "az-0"
|
||||
|
||||
# Generate a layer to avoid shard split handling on ps from tripping
|
||||
# up on debug assert.
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_timeline("bar", tids[0], timeline_id)
|
||||
|
||||
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
|
||||
workload.init()
|
||||
workload.write_rows(256)
|
||||
workload.validate()
|
||||
# Having modified preferred AZ, we should get moved there
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1)
|
||||
for tid in tids:
|
||||
shard = env.storage_controller.tenant_describe(tid)["shards"][0]
|
||||
attached_to = shard["node_attached"]
|
||||
attached_in_az = env.get_pageserver(attached_to).az_id
|
||||
assert shard["preferred_az_id"] == attached_in_az == "az-0"
|
||||
|
||||
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
|
||||
env.storage_controller.reconcile_until_idle(max_interval=0.1)
|
||||
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
|
||||
assert len(shards) == 2
|
||||
for shard in shards:
|
||||
attached_to = shard["node_attached"]
|
||||
expected_az = env.get_pageserver(attached_to).az_id
|
||||
|
||||
# The scheduling optimization logic is not yet AZ-aware, so doesn't succeed
|
||||
# in putting the tenant shards in the preferred AZ.
|
||||
# To be fixed in https://github.com/neondatabase/neon/pull/9916
|
||||
# assert shard["preferred_az_id"] == expected_az
|
||||
attached_in_az = env.get_pageserver(attached_to).az_id
|
||||
assert shard["preferred_az_id"] == attached_in_az == "az-0"
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||
|
||||
Reference in New Issue
Block a user