Compare commits

...

14 Commits

Author SHA1 Message Date
John Spray
50118e0347 tests: update shard split smoke 2024-12-06 17:58:18 +00:00
John Spray
f943fa5753 DNM: cargo: add test-log 2024-12-05 17:35:19 +00:00
John Spray
9172476147 storcon: update migration kick for multiple secondaries 2024-12-05 17:35:16 +00:00
John Spray
e3b8a3ee57 mk3 optimization 2024-12-05 17:33:01 +00:00
John Spray
5bb7472eab storcon: allow reconciles to succeed for down secondaries 2024-12-05 17:33:01 +00:00
John Spray
0d6966b9cb Take 2: Optimization, many_tenants now passes 2024-12-05 17:33:01 +00:00
John Spray
44689fd90b DNM: small scale test 2024-12-05 17:33:01 +00:00
John Spray
43f55136b6 DNM: reduce max optimisations to simplify testing 2024-12-05 17:33:01 +00:00
John Spray
9971913fa0 storcon: add test for scheduling stability 2024-12-05 17:33:01 +00:00
John Spray
01643006c6 optimisation change 2024-12-05 17:32:50 +00:00
John Spray
ea6f6a87a2 tests: update test_shard_preferred_azs
It can now assert that shards get moved to their preferred AZs
2024-12-05 17:31:13 +00:00
John Spray
017fffe583 storcon: change Attached policy to mean _at least_ N secondaries 2024-12-05 17:31:13 +00:00
John Spray
cf896ff144 storcon: prioritize AZ over affinity in scheduler 2024-12-05 17:31:13 +00:00
John Spray
bcd888126e storcon: AZ awareness when filling nodes 2024-12-05 17:31:13 +00:00
10 changed files with 1185 additions and 346 deletions

74
Cargo.lock generated
View File

@@ -1948,6 +1948,15 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "env_filter"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab"
dependencies = [
"log",
]
[[package]]
name = "env_logger"
version = "0.10.2"
@@ -1961,6 +1970,18 @@ dependencies = [
"termcolor",
]
[[package]]
name = "env_logger"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"log",
]
[[package]]
name = "equator"
version = "0.2.2"
@@ -3368,6 +3389,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -3663,6 +3694,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -4182,7 +4219,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f998c00148ab7c847bd7e6cfd3a906d0e7473"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4195,7 +4232,6 @@ dependencies = [
"rand 0.8.5",
"sha2",
"stringprep",
"tokio",
]
[[package]]
@@ -4217,7 +4253,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f998c00148ab7c847bd7e6cfd3a906d0e7473"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4273,7 +4309,7 @@ dependencies = [
"bindgen",
"bytes",
"crc32c",
"env_logger",
"env_logger 0.10.2",
"log",
"memoffset 0.9.0",
"once_cell",
@@ -4541,7 +4577,7 @@ dependencies = [
"consumption_metrics",
"dashmap",
"ecdsa 0.16.9",
"env_logger",
"env_logger 0.10.2",
"fallible-iterator",
"flate2",
"framed-websockets",
@@ -6064,6 +6100,7 @@ dependencies = [
"serde_json",
"strum",
"strum_macros",
"test-log",
"thiserror",
"tokio",
"tokio-util",
@@ -6350,6 +6387,28 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "test-log"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93"
dependencies = [
"env_logger 0.11.2",
"test-log-macros",
"tracing-subscriber",
]
[[package]]
name = "test-log-macros"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "thiserror"
version = "1.0.69"
@@ -6547,7 +6606,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#511f998c00148ab7c847bd7e6cfd3a906d0e7473"
dependencies = [
"async-trait",
"byteorder",
@@ -6901,6 +6960,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -7209,7 +7269,7 @@ dependencies = [
"anyhow",
"camino-tempfile",
"clap",
"env_logger",
"env_logger 0.10.2",
"log",
"postgres",
"postgres_ffi",

View File

@@ -325,6 +325,16 @@ pub enum PlacementPolicy {
Detached,
}
impl PlacementPolicy {
pub fn want_secondaries(&self) -> usize {
match self {
PlacementPolicy::Attached(secondary_count) => *secondary_count,
PlacementPolicy::Secondary => 1,
PlacementPolicy::Detached => 0,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateResponse {}

View File

@@ -56,3 +56,6 @@ utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]
test-log = {version="0.2.16"}

View File

@@ -809,7 +809,21 @@ impl Reconciler {
if self.cancel.is_cancelled() {
return Err(ReconcileError::Cancel);
}
self.location_config(&node, conf, None, false).await?;
// We only try to configure secondary locations if the node is available. This does
// not stop us succeeding with the reconcile, because our core goal is to make the
// shard _available_ (the attached location), and configuring secondary locations
// can be done lazily when the node becomes available (via background reconciliation).
if node.is_available() {
self.location_config(&node, conf, None, false).await?;
} else {
// If the node is unavailable, we skip and consider the reconciliation successful: this
// is a common case where a pageserver is marked unavailable: we demote a location on
// that unavailable pageserver to secondary.
tracing::info!("Skipping configuring secondary location {node}, it is unavailable");
self.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: None });
}
}
// The condition below identifies a detach. We must have no attached intent and

View File

@@ -47,6 +47,12 @@ pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Self>;
/// Return a score that drops any components based on node utilization: this is useful
/// for finding scores for scheduling optimisation, when we want to avoid rescheduling
/// shards due to e.g. disk usage, to avoid flapping.
fn for_optimization(&self) -> Self;
fn is_overloaded(&self) -> bool;
fn node_id(&self) -> NodeId;
}
@@ -136,17 +142,13 @@ impl PartialOrd for SecondaryAzMatch {
/// Ordering is given by member declaration order (top to bottom).
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub(crate) struct NodeAttachmentSchedulingScore {
/// The number of shards belonging to the tenant currently being
/// scheduled that are attached to this node.
affinity_score: AffinityScore,
/// Flag indicating whether this node matches the preferred AZ
/// of the shard. For equal affinity scores, nodes in the matching AZ
/// are considered first.
az_match: AttachmentAzMatch,
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
/// This normally tracks the number of attached shards belonging to the
/// tenant being scheduled that are already on this node.
attached_shards_in_context: usize,
/// The number of shards belonging to the tenant currently being
/// scheduled that are attached to this node.
affinity_score: AffinityScore,
/// Utilisation score that combines shard count and disk utilisation
utilization_score: u64,
/// Total number of shards attached to this node. When nodes have identical utilisation, this
@@ -177,13 +179,25 @@ impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
.copied()
.unwrap_or(AffinityScore::FREE),
az_match: AttachmentAzMatch(AzMatch::new(&node.az, preferred_az.as_ref())),
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
utilization_score: utilization.cached_score(),
total_attached_shard_count: node.attached_shard_count,
node_id: *node_id,
})
}
/// For use in scheduling optimisation, where we only want to consider the aspects
/// of the score that can only be resolved by moving things (such as inter-shard affinity
/// and AZ affinity), and ignore aspects that reflect the total utilization of a node (which
/// can fluctuate for other reasons)
fn for_optimization(&self) -> Self {
Self {
utilization_score: 0,
total_attached_shard_count: 0,
node_id: NodeId(0),
..*self
}
}
fn is_overloaded(&self) -> bool {
PageserverUtilization::is_overloaded(self.utilization_score)
}
@@ -242,6 +256,15 @@ impl NodeSchedulingScore for NodeSecondarySchedulingScore {
})
}
fn for_optimization(&self) -> Self {
Self {
utilization_score: 0,
total_attached_shard_count: 0,
node_id: NodeId(0),
..*self
}
}
fn is_overloaded(&self) -> bool {
PageserverUtilization::is_overloaded(self.utilization_score)
}
@@ -293,6 +316,10 @@ impl AffinityScore {
pub(crate) fn inc(&mut self) {
self.0 += 1;
}
pub(crate) fn dec(&mut self) {
self.0 -= 1;
}
}
impl std::ops::Add for AffinityScore {
@@ -353,15 +380,44 @@ impl ScheduleContext {
*entry += 1;
}
pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
self.nodes
.get(&node_id)
.copied()
.unwrap_or(AffinityScore::FREE)
/// Imagine we migrated our attached location to the given node. Return a new context that
/// reflects this.
pub(crate) fn project_detach(&self, shard: &TenantShard) -> Self {
let mut new_context = self.clone();
if let Some(attached) = shard.intent.get_attached() {
if let Some(count) = new_context.attached_nodes.get_mut(attached) {
// It's unexpected that we get called in a context where the source of
// the migration is not already in the context.
debug_assert!(*count > 0);
if *count > 0 {
*count -= 1;
}
}
if let Some(score) = new_context.nodes.get_mut(attached) {
score.dec();
}
}
for secondary in shard.intent.get_secondary() {
if let Some(score) = new_context.nodes.get_mut(secondary) {
score.dec();
}
}
new_context
}
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
pub(crate) fn project_secondary_detach(&self, source: NodeId) -> Self {
let mut new_context = self.clone();
if let Some(score) = new_context.nodes.get_mut(&source) {
score.dec();
}
new_context
}
#[cfg(test)]
@@ -636,6 +692,22 @@ impl Scheduler {
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
/// Calculate a single node's score, used in optimizer logic to compare specific
/// nodes' scores.
pub(crate) fn compute_node_score<Score>(
&mut self,
node_id: NodeId,
preferred_az: &Option<AvailabilityZone>,
context: &ScheduleContext,
) -> Option<Score>
where
Score: NodeSchedulingScore,
{
self.nodes
.get_mut(&node_id)
.and_then(|node| Score::generate(&node_id, node, preferred_az, context))
}
/// Compute a schedulling score for each node that the scheduler knows of
/// minus a set of hard excluded nodes.
fn compute_node_scores<Score>(
@@ -727,7 +799,7 @@ impl Scheduler {
tracing::info!(
"scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
);
);
}
// Note that we do not update shard count here to reflect the scheduling: that
@@ -742,6 +814,10 @@ impl Scheduler {
self.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
}
pub(crate) fn get_node_az(&self, node_id: &NodeId) -> Option<AvailabilityZone> {
self.nodes.get(node_id).map(|n| n.az.clone())
}
/// Unit test access to internal state
#[cfg(test)]
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
@@ -799,7 +875,14 @@ pub(crate) mod test_utils {
#[cfg(test)]
mod tests {
use pageserver_api::{controller_api::NodeAvailability, models::utilization::test_utilization};
use pageserver_api::{
controller_api::NodeAvailability, models::utilization::test_utilization,
shard::ShardIdentity,
};
use utils::{
id::TenantId,
shard::{ShardCount, ShardNumber, TenantShardId},
};
use super::*;
@@ -1045,9 +1128,9 @@ mod tests {
&mut context,
);
// Node 2 is not in "az-a", but it has the lowest affinity so we prefer that.
// Node 1 and 3 (az-a) have same affinity score, so prefer the lowest node id.
assert_scheduler_chooses::<AttachedShardTag>(
NodeId(2),
NodeId(1),
Some(az_a_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
@@ -1063,28 +1146,196 @@ mod tests {
&mut context,
);
// Avoid nodes in "az-b" for the secondary location.
// Nodes 1 and 3 are identically loaded, so prefer the lowest node id.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(1),
Some(az_b_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
// Avoid nodes in "az-b" for the secondary location.
// Node 3 has lower affinity score than 1, so prefer that.
assert_scheduler_chooses::<SecondaryShardTag>(
NodeId(3),
Some(az_b_tag.clone()),
&mut scheduled_intents,
&mut scheduler,
&mut context,
);
for mut intent in scheduled_intents {
intent.clear(&mut scheduler);
}
}
use test_log::test;
#[test]
/// Make sure that when we have an odd number of nodes and an even number of shards, we still
/// get scheduling stability.
fn odd_nodes_stability() {
let az_a = AvailabilityZone("az-a".to_string());
let az_b = AvailabilityZone("az-b".to_string());
let nodes = test_utils::make_test_nodes(
10,
&[
az_a.clone(),
az_a.clone(),
az_a.clone(),
az_a.clone(),
az_a.clone(),
az_b.clone(),
az_b.clone(),
az_b.clone(),
az_b.clone(),
az_b.clone(),
],
);
let mut scheduler = Scheduler::new(nodes.values());
// Need to keep these alive because they contribute to shard counts via RAII
let mut scheduled_shards = Vec::new();
let mut context = ScheduleContext::default();
fn schedule_shard(
tenant_shard_id: TenantShardId,
expect_attached: NodeId,
expect_secondary: NodeId,
scheduled_shards: &mut Vec<TenantShard>,
scheduler: &mut Scheduler,
preferred_az: Option<AvailabilityZone>,
context: &mut ScheduleContext,
) {
let shard_identity = ShardIdentity::new(
tenant_shard_id.shard_number,
tenant_shard_id.shard_count,
pageserver_api::shard::ShardStripeSize(1),
)
.unwrap();
let mut shard = TenantShard::new(
tenant_shard_id,
shard_identity,
pageserver_api::controller_api::PlacementPolicy::Attached(1),
preferred_az,
);
shard.schedule(scheduler, context).unwrap();
assert_eq!(shard.intent.get_attached().unwrap(), expect_attached);
assert_eq!(
shard.intent.get_secondary().first().unwrap(),
&expect_secondary
);
scheduled_shards.push(shard);
}
let tenant_id = TenantId::generate();
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(8),
},
NodeId(1),
NodeId(6),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(1),
shard_count: ShardCount(8),
},
NodeId(2),
NodeId(7),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(2),
shard_count: ShardCount(8),
},
NodeId(3),
NodeId(8),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(3),
shard_count: ShardCount(8),
},
NodeId(4),
NodeId(9),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(4),
shard_count: ShardCount(8),
},
NodeId(5),
NodeId(10),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(5),
shard_count: ShardCount(8),
},
NodeId(1),
NodeId(6),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(6),
shard_count: ShardCount(8),
},
NodeId(2),
NodeId(7),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
schedule_shard(
TenantShardId {
tenant_id,
shard_number: ShardNumber(7),
shard_count: ShardCount(8),
},
NodeId(3),
NodeId(8),
&mut scheduled_shards,
&mut scheduler,
Some(az_a.clone()),
&mut context,
);
// Assert that the optimizer suggests nochanges, i.e. our initial scheduling was stable.
for shard in &scheduled_shards {
assert_eq!(shard.optimize_attachment(&mut scheduler, &context), None);
}
for mut shard in scheduled_shards {
shard.intent.clear(&mut scheduler);
}
}
}

View File

@@ -29,7 +29,7 @@ use crate::{
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
ScheduleOptimization, ScheduleOptimizationAction,
@@ -1576,6 +1576,7 @@ impl Service {
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Attached(0),
None,
),
);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
@@ -2103,6 +2104,21 @@ impl Service {
)
};
let preferred_az_id: Option<AvailabilityZone> = {
let mut locked = self.inner.write().unwrap();
// Idempotency: take the existing value if the tenant already exists
if let Some(shard) = locked.tenants.get(create_ids.first().unwrap()) {
shard.preferred_az().cloned()
} else {
locked
.scheduler
.schedule_shard::<AttachedShardTag>(&[], &None, &ScheduleContext::default())
.ok()
.and_then(|n_id| locked.scheduler.get_node_az(&n_id))
}
};
// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
// during the creation, rather than risking leaving orphan objects in S3.
@@ -2122,7 +2138,7 @@ impl Service {
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: None,
preferred_az_id: preferred_az_id.as_ref().map(|az| az.to_string()),
})
.collect();
@@ -2158,6 +2174,7 @@ impl Service {
&create_req.shard_parameters,
create_req.config.clone(),
placement_policy.clone(),
preferred_az_id.as_ref(),
&mut schedule_context,
)
.await;
@@ -2171,44 +2188,6 @@ impl Service {
}
}
let preferred_azs = {
let locked = self.inner.read().unwrap();
response_shards
.iter()
.filter_map(|resp| {
let az_id = locked
.nodes
.get(&resp.node_id)
.map(|n| n.get_availability_zone_id().clone())?;
Some((resp.shard_id, az_id))
})
.collect::<Vec<_>>()
};
// Note that we persist the preferred AZ for the new shards separately.
// In theory, we could "peek" the scheduler to determine where the shard will
// land, but the subsequent "real" call into the scheduler might select a different
// node. Hence, we do this awkward update to keep things consistent.
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
})?;
{
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
// If we failed to schedule shards, then they are still created in the controller,
// but we return an error to the requester to avoid a silent failure when someone
// tries to e.g. create a tenant whose placement policy requires more nodes than
@@ -2239,6 +2218,7 @@ impl Service {
/// Helper for tenant creation that does the scheduling for an individual shard. Covers both the
/// case of a new tenant and a pre-existing one.
#[allow(clippy::too_many_arguments)]
async fn do_initial_shard_scheduling(
&self,
tenant_shard_id: TenantShardId,
@@ -2246,6 +2226,7 @@ impl Service {
shard_params: &ShardParameters,
config: TenantConfig,
placement_policy: PlacementPolicy,
preferred_az_id: Option<&AvailabilityZone>,
schedule_context: &mut ScheduleContext,
) -> InitialShardScheduleOutcome {
let mut locked = self.inner.write().unwrap();
@@ -2283,6 +2264,7 @@ impl Service {
tenant_shard_id,
ShardIdentity::from_params(tenant_shard_id.shard_number, shard_params),
placement_policy,
preferred_az_id.cloned(),
));
state.generation = initial_generation;
@@ -4149,16 +4131,14 @@ impl Service {
},
);
let mut child_state = TenantShard::new(child, child_shard, policy.clone());
let mut child_state =
TenantShard::new(child, child_shard, policy.clone(), preferred_az.clone());
child_state.intent = IntentState::single(scheduler, Some(pageserver));
child_state.observed = ObservedState {
locations: child_observed,
};
child_state.generation = Some(generation);
child_state.config = config.clone();
if let Some(preferred_az) = &preferred_az {
child_state.set_preferred_az(preferred_az.clone());
}
// The child's TenantShard::splitting is intentionally left at the default value of Idle,
// as at this point in the split process we have succeeded and this part is infallible:
@@ -5351,7 +5331,7 @@ impl Service {
register_req.listen_http_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.availability_zone_id,
register_req.availability_zone_id.clone(),
);
// TODO: idempotency if the node already exists in the database
@@ -5371,8 +5351,9 @@ impl Service {
.set(locked.nodes.len() as i64);
tracing::info!(
"Registered pageserver {}, now have {} pageservers",
"Registered pageserver {} ({}), now have {} pageservers",
register_req.node_id,
register_req.availability_zone_id,
locked.nodes.len()
);
Ok(())
@@ -6109,11 +6090,17 @@ impl Service {
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 2;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let (_nodes, tenants, scheduler) = locked.parts_mut();
// We are going to plan a bunch of optimisations before applying any of them, so the
// utilisation stats on nodes will be effectively stale for the >1st optimisation we
// generate. To avoid this causing unstable migrations/flapping, it's important that the
// code in TenantShard for finding optimisations uses [`NodeAttachmentSchedulingScore::disregard_utilization`]
// to ignore the utilisation component of the score.
for (_tenant_id, schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
@@ -6149,7 +6136,7 @@ impl Service {
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
shard.optimize_attachment(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
@@ -6208,8 +6195,10 @@ impl Service {
}
}
}
ScheduleOptimizationAction::ReplaceSecondary(_) => {
// No extra checks needed to replace a secondary: this does not interrupt client access
ScheduleOptimizationAction::ReplaceSecondary(_)
| ScheduleOptimizationAction::CreateSecondary(_)
| ScheduleOptimizationAction::RemoveSecondary(_) => {
// No extra checks needed to manage secondaries: this does not interrupt client access
validated_work.push((tenant_shard_id, optimization))
}
};
@@ -6281,26 +6270,35 @@ impl Service {
/// we have this helper to move things along faster.
#[cfg(feature = "testing")]
async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) {
let (attached_node, secondary_node) = {
let (attached_node, secondaries) = {
let locked = self.inner.read().unwrap();
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
tracing::warn!(
"Skipping kick of secondary download for {tenant_shard_id}: not found"
);
return;
};
let (Some(attached), Some(secondary)) = (
shard.intent.get_attached(),
shard.intent.get_secondary().first(),
) else {
let Some(attached) = shard.intent.get_attached() else {
tracing::warn!(
"Skipping kick of secondary download for {tenant_shard_id}: no attached"
);
return;
};
(
locked.nodes.get(attached).unwrap().clone(),
locked.nodes.get(secondary).unwrap().clone(),
)
let secondaries = shard
.intent
.get_secondary()
.iter()
.map(|n| locked.nodes.get(n).unwrap().clone())
.collect::<Vec<_>>();
(locked.nodes.get(attached).unwrap().clone(), secondaries)
};
// Make remote API calls to upload + download heatmaps: we ignore errors because this is just
// a 'kick' to let scheduling optimisation run more promptly.
attached_node
match attached_node
.with_client_retries(
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
&self.config.jwt_token,
@@ -6309,22 +6307,57 @@ impl Service {
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
.await
{
Some(Err(e)) => {
tracing::info!(
"Failed to upload heatmap from {attached_node} for {tenant_shard_id}: {e}"
);
}
None => {
tracing::info!(
"Cancelled while uploading heatmap from {attached_node} for {tenant_shard_id}"
);
}
Some(Ok(_)) => {
tracing::info!(
"Successfully uploaded heatmap from {attached_node} for {tenant_shard_id}"
);
}
}
secondary_node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(tenant_shard_id, Some(Duration::from_secs(1)))
.await
},
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
for secondary_node in secondaries {
match secondary_node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(
tenant_shard_id,
Some(Duration::from_secs(1)),
)
.await
},
&self.config.jwt_token,
3,
10,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
Some(Err(e)) => {
tracing::info!(
"Failed to download heatmap from {secondary_node} for {tenant_shard_id}: {e}"
);
}
None => {
tracing::info!("Cancelled while downloading heatmap from {secondary_node} for {tenant_shard_id}");
}
Some(Ok(progress)) => {
tracing::info!("Successfully downloaded heatmap from {secondary_node} for {tenant_shard_id}: {progress:?}");
}
}
}
}
/// Look for shards which are oversized and in need of splitting
@@ -6760,9 +6793,15 @@ impl Service {
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
let mut locked = self.inner.write().unwrap();
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
let (nodes, tenants, _scheduler) = locked.parts_mut();
let mut tids_by_node = locked
.tenants
let node_az = nodes
.get(&node_id)
.expect("Node must exist")
.get_availability_zone_id()
.clone();
let mut tids_by_node = tenants
.iter_mut()
.filter_map(|(tid, tenant_shard)| {
if !matches!(
@@ -6775,6 +6814,25 @@ impl Service {
return None;
}
// AZ check: when filling nodes after a restart, our intent is to move _back_ the
// shards which belong on this node, not to promote shards whose scheduling preference
// would be on their currently attached node. So will avoid promoting shards whose
// home AZ doesn't match the AZ of the node we're filling.
match tenant_shard.preferred_az() {
None => {
// Shard doesn't have an AZ preference: it is elegible to be moved.
}
Some(az) if az == &node_az => {
// This shard's home AZ is equal to the node we're filling: it is
// elegible to be moved: fall through;
}
Some(_) => {
// This shard's home AZ is somewhere other than the node we're filling:
// do not include it in the fill plan.
return None;
}
}
if tenant_shard.intent.get_secondary().contains(&node_id) {
if let Some(primary) = tenant_shard.intent.get_attached() {
return Some((*primary, *tid));

View File

@@ -11,16 +11,14 @@ use crate::{
persistence::TenantShardPersistence,
reconciler::{ReconcileUnits, ReconcilerConfig},
scheduler::{
AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
SecondaryShardTag,
AttachedShardTag, NodeAttachmentSchedulingScore, NodeSchedulingScore,
NodeSecondarySchedulingScore, RefCountUpdate, ScheduleContext, SecondaryShardTag,
},
service::ReconcileResultRequest,
};
use futures::future::{self, Either};
use itertools::Itertools;
use pageserver_api::controller_api::{
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -315,6 +313,7 @@ pub(crate) struct ObservedStateLocation {
/// we know that we might have some state on this node.
pub(crate) conf: Option<LocationConfig>,
}
pub(crate) struct ReconcilerWaiter {
// For observability purposes, remember the ID of the shard we're
// waiting for.
@@ -360,6 +359,10 @@ pub(crate) enum ScheduleOptimizationAction {
ReplaceSecondary(ReplaceSecondary),
// Migrate attachment to an existing secondary location
MigrateAttachment(MigrateAttachment),
// Create a secondary location, with the intent of later migrating to it
CreateSecondary(NodeId),
// Remove a secondary location that we previously created to facilitate a migration
RemoveSecondary(NodeId),
}
#[derive(Eq, PartialEq, Debug, Clone)]
@@ -472,6 +475,7 @@ impl TenantShard {
tenant_shard_id: TenantShardId,
shard: ShardIdentity,
policy: PlacementPolicy,
preferred_az_id: Option<AvailabilityZone>,
) -> Self {
metrics::METRICS_REGISTRY
.metrics_group
@@ -495,7 +499,7 @@ impl TenantShard {
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_az_id: None,
preferred_az_id,
}
}
@@ -626,24 +630,7 @@ impl TenantShard {
use PlacementPolicy::*;
match self.policy {
Attached(secondary_count) => {
let retain_secondaries = if self.intent.attached.is_none()
&& scheduler.node_preferred(&self.intent.secondary).is_some()
{
// If we have no attached, and one of the secondaries is elegible to be promoted, retain
// one more secondary than we usually would, as one of them will become attached futher down this function.
secondary_count + 1
} else {
secondary_count
};
while self.intent.secondary.len() > retain_secondaries {
// We have no particular preference for one secondary location over another: just
// arbitrarily drop from the end
self.intent.pop_secondary(scheduler);
modified = true;
}
// Should have exactly one attached, and N secondaries
// Should have exactly one attached, and at least N secondaries
let (modified_attached, attached_node_id) =
self.schedule_attached(scheduler, context)?;
modified |= modified_attached;
@@ -740,90 +727,258 @@ impl TenantShard {
Ok(())
}
/// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
fn is_better_location(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
current: NodeId,
candidate: NodeId,
) -> Option<bool> {
let Some(candidate_score) = scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
candidate,
&self.preferred_az_id,
schedule_context,
) else {
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
return None;
};
match scheduler.compute_node_score::<NodeAttachmentSchedulingScore>(
current,
&self.preferred_az_id,
schedule_context,
) {
Some(current_score) => {
// Ignore utilization components when comparing scores: we don't want to migrate
// because of transient load variations, it risks making the system thrash, and
// migrating for utilization requires a separate high level view of the system to
// e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
// moving things around in the order that we hit this function.
let candidate_score = candidate_score.for_optimization();
let current_score = current_score.for_optimization();
if candidate_score < current_score {
tracing::info!("Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})");
Some(true)
} else {
// The candidate node is no better than our current location, so don't migrate
tracing::debug!(
"Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
);
Some(false)
}
}
None => {
// The current node is unavailable for scheduling, so we can't make any sensible
// decisions about optimisation. This should be a transient state -- if the node
// is offline then it will get evacuated, if is blocked by a scheduling mode
// then we will respect that mode by doing nothing.
tracing::debug!("Current node {current} is unavailable for scheduling");
None
}
}
}
fn find_better_location(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<NodeId> {
// TODO: fast path: if the attached node is already in the preferred AZ, _and_ has no
// other shards from the same tenant on it, then skip doing any scheduling calculations.
let attached = (*self.intent.get_attached())?;
tracing::info!(
"Initially: attached {attached} ({:?} vs {:?}), in context {schedule_context:?}",
scheduler.get_node_az(&attached),
self.preferred_az_id.as_ref()
);
// Construct a schedule context that excludes locations belonging to
// this shard: this simulates removing and re-scheduling the shard
// let schedule_context = schedule_context.project_detach(self);
// Look for a lower-scoring location to attach to
let Ok(candidate_node) = scheduler.schedule_shard::<AttachedShardTag>(
&[], // Don't hard-exclude anything: we want to consider the possibility of migrating to somewhere we already have a secondary
&self.preferred_az_id,
schedule_context,
) else {
// A scheduling error means we have no possible candidate replacements
tracing::debug!("No candidate node found");
return None;
};
if candidate_node == attached {
// We're already at the best possible location, so don't migrate
tracing::debug!("Candidate node {candidate_node} is already attached");
return None;
}
// Check if our candidate node is in the preferred AZ: it might not be, if the scheduler
// is trying its best to handle an overloaded AZ.
if self.preferred_az_id.is_some()
&& scheduler.get_node_az(&candidate_node) != self.preferred_az_id
{
tracing::debug!(
"Candidate node {candidate_node} is not in preferred AZ {:?}",
self.preferred_az_id
);
return None;
}
self.is_better_location(scheduler, schedule_context, attached, candidate_node)
.and_then(|better| if better { Some(candidate_node) } else { None })
}
/// Optimize attachments: if a shard has a secondary location that is preferable to
/// its primary location based on soft constraints, switch that secondary location
/// to be attached.
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_attachment(
&self,
nodes: &HashMap<NodeId, Node>,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
let attached = (*self.intent.get_attached())?;
if self.intent.secondary.is_empty() {
// We can only do useful work if we have both attached and secondary locations: this
// function doesn't schedule new locations, only swaps between attached and secondaries.
return None;
}
let current_affinity_score = schedule_context.get_node_affinity(attached);
let current_attachment_count = schedule_context.get_node_attachments(attached);
let schedule_context = schedule_context.project_detach(self);
// Generate score for each node, dropping any un-schedulable nodes.
let all_pageservers = self.intent.all_pageservers();
let mut scores = all_pageservers
.iter()
.flat_map(|node_id| {
let node = nodes.get(node_id);
if node.is_none() {
None
} else if matches!(
node.unwrap().get_scheduling(),
NodeSchedulingPolicy::Filling
) {
// If the node is currently filling, don't count it as a candidate to avoid,
// racing with the background fill.
None
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
None
} else {
let affinity_score = schedule_context.get_node_affinity(*node_id);
let attachment_count = schedule_context.get_node_attachments(*node_id);
Some((*node_id, affinity_score, attachment_count))
}
})
.collect::<Vec<_>>();
// Sort precedence:
// 1st - prefer nodes with the lowest total affinity score
// 2nd - prefer nodes with the lowest number of attachments in this context
// 3rd - if all else is equal, sort by node ID for determinism in tests.
scores.sort_by_key(|i| (i.1, i.2, i.0));
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
scores.first()
{
if attached != *preferred_node {
// The best alternative must be more than 1 better than us, otherwise we could end
// up flapping back next time we're called (e.g. there's no point migrating from
// a location with score 1 to a score zero, because on next location the situation
// would be the same, but in reverse).
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|| current_attachment_count > *preferred_attachment_count + 1
{
tracing::info!(
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}),
});
}
} else {
tracing::debug!(
"Node {} is already preferred (score {:?})",
preferred_node,
preferred_affinity_score
);
// If we already have a secondary that is higher-scoring than out current location,
// then simply migrate to it.
for secondary in self.intent.get_secondary() {
if let Some(true) =
self.is_better_location(scheduler, &schedule_context, attached, *secondary)
{
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *secondary,
}),
});
}
}
// Fall-through: we didn't find an optimization
None
// Given that none of our current secondaries is a better location than our current
// attached location (checked above), we may trim any secondaries that are not needed
// for the placement policy.
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// This code path cleans up extra secondaries after migrating, and/or
// trims extra secondaries after a PlacementPolicy::Attached(N) was
// modified to decrease N.
let mut secondary_scores = self
.intent
.get_secondary()
.iter()
.map(|node_id| {
(
*node_id,
scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
*node_id,
&self.preferred_az_id,
&schedule_context,
),
)
})
.collect::<Vec<_>>();
if secondary_scores.iter().any(|score| score.1.is_none()) {
// Don't have full list of scores, so can't make a good decision about which to drop unless
// there is an obvious one in the wrong AZ
for secondary in self.intent.get_secondary() {
if scheduler.get_node_az(secondary) == self.preferred_az_id {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
});
}
}
// Fall through: we didn't identify one to remove. This ought to be rare.
tracing::warn!("Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
self.intent.get_secondary()
);
} else {
secondary_scores.sort_by_key(|score| score.1.unwrap());
let victim = secondary_scores.last().unwrap().0;
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(victim),
});
}
}
let replacement = self.find_better_location(scheduler, &schedule_context);
// We have found a candidate and confirmed that its score is preferable
// to our current location. See if we have a secondary location in the preferred location already: if not,
// then create one.
if let Some(replacement) = replacement {
if !self.intent.get_secondary().contains(&replacement) {
tracing::info!(
"Identified optimization({}): create secondary {replacement}",
self.tenant_shard_id
);
Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::CreateSecondary(replacement),
})
} else {
// We already have a secondary in the preferred location, let's try migrating to it. Our caller
// will check the warmth of the destination before deciding whether to really execute this.
tracing::info!(
"Identified optimization({}): migrate attachment {attached}->{replacement}",
self.tenant_shard_id
);
Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: replacement,
}),
})
}
// } else if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// // We aren't in the process of migrating anywhere, and we're attached in our preferred AZ. If there are
// // any other secondary locations in our preferred AZ, we presume they were created to facilitate a migration
// // of the attached location, and remove them.
// for secondary in self.intent.get_secondary() {
// if scheduler.get_node_az(secondary) == self.preferred_az_id {
// tracing::info!(
// "Identified optimization({}): remove secondary {secondary}",
// self.tenant_shard_id
// );
// return Some(ScheduleOptimization {
// sequence: self.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(*secondary),
// });
// }
// }
// // Fall through: maybe we had excess secondaries in other AZs? Trim them in an arbitrary order
// // (lowest Node ID first).
// let mut secondary_node_ids = self.intent.get_secondary().clone();
// secondary_node_ids.sort();
// let victim = secondary_node_ids
// .first()
// .expect("Within block for > check on secondary count");
// Some(ScheduleOptimization {
// sequence: self.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(*victim),
// })
} else {
// We didn't find somewhere we'd rather be, and we don't have any excess secondaries
// to clean up: no action required.
None
}
// TODO: if we find that our current location is optimal, _and_ we also have a secondary
// in the preferred AZ, then clean up that secondary: it was only created to act as a
// migration destination.
// ...maybe do this in optimize_secondary()?
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
@@ -832,18 +987,18 @@ impl TenantShard {
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
if self.intent.secondary.is_empty() {
// We can only do useful work if we have both attached and secondary locations: this
// function doesn't schedule new locations, only swaps between attached and secondaries.
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// We have extra secondaries, perhaps to facilitate a migration of the attached location:
// do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
// and we are called again, we will proceed.
tracing::debug!("Too many secondaries: skipping");
return None;
}
for secondary in self.intent.get_secondary() {
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
// We're already on a node unaffected any affinity constraints,
// so we won't change it.
continue;
};
let schedule_context = schedule_context.project_secondary_detach(*secondary);
// TODO: fast path to avoid full scheduling calculation if we're in the right AZ and not
// sharing with any other shards in the same tenant
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
// This implicitly limits the choice to nodes that are available, and prefers nodes
@@ -851,33 +1006,63 @@ impl TenantShard {
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
&self.intent.all_pageservers(),
&self.preferred_az_id,
schedule_context,
&schedule_context,
) else {
// A scheduling error means we have no possible candidate replacements
continue;
};
let candidate_affinity_score = schedule_context
.nodes
.get(&candidate_node)
.unwrap_or(&AffinityScore::FREE);
let Some(candidate_score) = scheduler
.compute_node_score::<NodeSecondarySchedulingScore>(
candidate_node,
&self.preferred_az_id,
&schedule_context,
)
else {
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
// This is unexpected, because schedule() yielded this node
debug_assert!(false);
continue;
};
// The best alternative must be more than 1 better than us, otherwise we could end
// up flapping back next time we're called.
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
// If some other node is available and has a lower score than this node, then
// that other node is a good place to migrate to.
tracing::info!(
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}),
});
match scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
*secondary,
&self.preferred_az_id,
&schedule_context,
) {
Some(current_score) => {
// Disregard utilization: we don't want to thrash around based on disk utilization
let current_score = current_score.for_optimization();
let candidate_score = candidate_score.for_optimization();
if candidate_score < current_score {
tracing::info!(
"Identified optimization({}, home AZ {:?}): replace secondary {secondary}->{candidate_node} (current secondaries {:?}) Candidate {:?} < current {:?} ",
self.tenant_shard_id,
self.preferred_az_id,
self.intent.get_secondary(),
candidate_score,
current_score
);
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(
ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
},
),
});
}
}
None => {
// The current node is unavailable for scheduling, so we can't make any sensible
// decisions about optimisation. This should be a transient state -- if the node
// is offline then it will get evacuated, if is blocked by a scheduling mode
// then we will respect that mode by doing nothing.
tracing::debug!("Current node {secondary} is unavailable for scheduling");
continue;
}
}
}
@@ -916,6 +1101,12 @@ impl TenantShard {
self.intent.remove_secondary(scheduler, old_node_id);
self.intent.push_secondary(scheduler, new_node_id);
}
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
self.intent.push_secondary(scheduler, new_node_id);
}
ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
self.intent.remove_secondary(scheduler, old_secondary);
}
}
true
@@ -1571,6 +1762,7 @@ pub(crate) mod tests {
)
.unwrap(),
policy,
None,
)
}
@@ -1606,6 +1798,7 @@ pub(crate) mod tests {
)
.unwrap(),
policy.clone(),
None,
);
if let Some(az) = &preferred_az {
@@ -1749,65 +1942,92 @@ pub(crate) mod tests {
}
#[test]
fn optimize_attachment() -> anyhow::Result<()> {
let nodes = make_test_nodes(3, &[]);
/// Simple case: moving attachment to somewhere better where we already have a secondary
fn optimize_attachment_simple() -> anyhow::Result<()> {
let nodes = make_test_nodes(
3,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Initially: both nodes attached on shard 1, and both have secondary locations
// on different nodes.
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
schedule_context
}
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
// Either shard should recognize that it has the option to switch to a secondary location where there
// would be no other shards from the same tenant, and request to do so.
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
// of [`Service::optimize_all`] to avoid trying
// to do optimizations for multiple shards in the same tenant at the same time. Generating
// both optimizations is just done for test purposes
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
assert_eq!(
optimization_b,
Some(ScheduleOptimization {
sequence: shard_b.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
})
})
);
// Applying these optimizations should result in the end state proposed
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
// // Either shard should recognize that it has the option to switch to a secondary location where there
// // would be no other shards from the same tenant, and request to do so.
// assert_eq!(
// optimization_a_prepare,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_migrate,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
// old_attached_node_id: NodeId(1),
// new_attached_node_id: NodeId(2)
// })
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_cleanup,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
@@ -1815,6 +2035,168 @@ pub(crate) mod tests {
Ok(())
}
#[test]
/// Complicated case: moving attachment to somewhere better where we do not have a secondary
/// already, creating one as needed.
fn optimize_attachment_multistep() -> anyhow::Result<()> {
let nodes = make_test_nodes(
3,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
// Two shards of a tenant that wants to be in AZ A
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_b.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
schedule_context
}
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_cleanup,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
#[test]
/// Check that multi-step migration works when moving to somewhere that is only better by
/// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
/// counting toward the affinity score such that it prevents the rest of the migration from happening.
fn optimize_attachment_marginal() -> anyhow::Result<()> {
let nodes = make_test_nodes(2, &[]);
let mut scheduler = Scheduler::new(nodes.values());
// Multi-sharded tenant, we will craft a situation where affinity
// scores differ only slightly
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
// 1 attached on node 1
shards[0]
.intent
.set_attached(&mut scheduler, Some(NodeId(1)));
// 2 attached and one secondary on node 2
shards[1]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
shards[2]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
shards[1].intent.push_secondary(&mut scheduler, NodeId(2));
fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
for shard in shards {
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
}
schedule_context
}
let schedule_context = make_schedule_context(&shards);
let optimization_a_prepare =
shards[2].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shards[2].sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
})
);
shards[2].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
let schedule_context = make_schedule_context(&shards);
let optimization_a_migrate =
shards[2].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shards[2].sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shards[2].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_cleanup,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
for mut shard in shards {
shard.intent.clear(&mut scheduler);
}
Ok(())
}
#[test]
fn optimize_secondary() -> anyhow::Result<()> {
let nodes = make_test_nodes(4, &[]);
@@ -1865,7 +2247,6 @@ pub(crate) mod tests {
// called repeatedly in the background.
// Returns the applied optimizations
fn optimize_til_idle(
nodes: &HashMap<NodeId, Node>,
scheduler: &mut Scheduler,
shards: &mut [TenantShard],
) -> Vec<ScheduleOptimization> {
@@ -1883,7 +2264,12 @@ pub(crate) mod tests {
}
for shard in shards.iter_mut() {
let optimization = shard.optimize_attachment(nodes, &schedule_context);
let optimization = shard.optimize_attachment(scheduler, &schedule_context);
tracing::info!(
"optimize_attachment({})={:?}",
shard.tenant_shard_id,
optimization
);
if let Some(optimization) = optimization {
optimizations.push(optimization.clone());
shard.apply_optimization(scheduler, optimization);
@@ -1892,6 +2278,11 @@ pub(crate) mod tests {
}
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
tracing::info!(
"optimize_secondary({})={:?}",
shard.tenant_shard_id,
optimization
);
if let Some(optimization) = optimization {
optimizations.push(optimization.clone());
shard.apply_optimization(scheduler, optimization);
@@ -1912,18 +2303,40 @@ pub(crate) mod tests {
optimizations
}
use test_log::test;
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
/// that it converges.
#[test]
fn optimize_add_nodes() -> anyhow::Result<()> {
let nodes = make_test_nodes(4, &[]);
let nodes = make_test_nodes(
9,
&[
// Initial 6 nodes
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
AvailabilityZone("az-c".to_string()),
// Three we will add later
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
// Only show the scheduler a couple of nodes
// Only show the scheduler two nodes in each AZ to start with
let mut scheduler = Scheduler::new([].iter());
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
for i in 1..=6 {
scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
}
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let mut shards = make_test_tenant(
PlacementPolicy::Attached(1),
ShardCount::new(4),
Some(AvailabilityZone("az-a".to_string())),
);
let mut schedule_context = ScheduleContext::default();
for shard in &mut shards {
assert!(shard
@@ -1931,30 +2344,50 @@ pub(crate) mod tests {
.is_ok());
}
// We should see equal number of locations on the two nodes.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
// Initial: attached locations land in the tenant's home AZ.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
// Add another two nodes: we should see the shards spread out when their optimize
// methods are called
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
// Initial: secondary locations in a remote AZ
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
// Add another three nodes: we should see the shards spread out when their optimize
// methods are called
scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
optimize_til_idle(&mut scheduler, &mut shards);
// We expect one attached location was moved to the new node in the tenant's home AZ
assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
// The original node has one less attached shard
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
// One of the original nodes still has two attachments, since there are an odd number of nodes
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
// None of our secondaries moved, since we already had enough nodes for those to be
// scheduled perfectly
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
for shard in shards.iter_mut() {
shard.intent.clear(&mut scheduler);
@@ -1994,10 +2427,10 @@ pub(crate) mod tests {
shard.schedule(&mut scheduler, context).unwrap();
}
let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
assert_eq!(applied_to_a, vec![]);
let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
assert_eq!(applied_to_b, vec![]);
for shard in a.iter_mut().chain(b.iter_mut()) {

View File

@@ -86,7 +86,7 @@ def test_storage_controller_many_tenants(
AZS = ["alpha", "bravo", "charlie"]
neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update(
{"availability_zone": f"az-{AZS[ps_cfg['id'] % len(AZS)]}"}
{"availability_zone": f"az-{AZS[(ps_cfg['id'] - 1) % len(AZS)]}"}
)
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
@@ -114,8 +114,8 @@ def test_storage_controller_many_tenants(
ps.allowed_errors.append(".*request was dropped before completing.*")
# Total tenants
small_tenant_count = 7800
large_tenant_count = 200
small_tenant_count = 780
large_tenant_count = 20
tenant_count = small_tenant_count + large_tenant_count
large_tenant_shard_count = 8
total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count
@@ -141,7 +141,7 @@ def test_storage_controller_many_tenants(
# We will create timelines in only a subset of tenants, because creating timelines
# does many megabytes of IO, and we want to densely simulate huge tenant counts on
# a single test node.
tenant_timelines_count = 100
tenant_timelines_count = 10
# These lists are maintained for use with rng.choice
tenants_with_timelines = list(rng.sample(list(tenants.keys()), tenant_timelines_count))
@@ -380,7 +380,7 @@ def test_storage_controller_many_tenants(
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts before rolling restart: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# assert_consistent_balanced_attachments(env, total_shards)
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
# and the storage controller drain and fill API
@@ -445,7 +445,7 @@ def test_storage_controller_many_tenants(
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.consistency_check()

View File

@@ -516,14 +516,18 @@ def test_sharding_split_smoke(
shard_count = 2
# Shard count we split into
split_shard_count = 4
# We will have 2 shards per pageserver once done (including secondaries)
neon_env_builder.num_pageservers = split_shard_count
# In preferred AZ & other AZ we will end up with one shard per pageserver
neon_env_builder.num_pageservers = split_shard_count * 2
# Two AZs
def assign_az(ps_cfg):
az = f"az-{(ps_cfg['id'] - 1) % 2}"
ps_cfg["availability_zone"] = az
# We will run more pageservers than tests usually do, so give them tiny page caches
# in case we're on a test node under memory pressure.
ps_cfg["page_cache_size"] = 128
neon_env_builder.pageserver_config_override = assign_az
# 1MiB stripes: enable getting some meaningful data distribution without
@@ -659,8 +663,8 @@ def test_sharding_split_smoke(
# - shard_count reconciles for the original setup of the tenant
# - shard_count reconciles for detaching the original secondary locations during split
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
# - split_shard_count/2 reconciles to migrate shards to their temporary secondaries
expect_reconciles = shard_count * 2 + split_shard_count + 3 * (split_shard_count / 2)
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
@@ -725,10 +729,14 @@ def test_sharding_split_smoke(
# dominated by shard count.
log.info(f"total: {total}")
assert total == {
1: 2,
2: 2,
3: 2,
4: 2,
1: 1,
2: 1,
3: 1,
4: 1,
5: 1,
6: 1,
7: 1,
8: 1,
}
# The controller is not required to lay out the attached locations in any particular way, but

View File

@@ -3011,11 +3011,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
def assign_az(ps_cfg):
az = f"az-{ps_cfg['id']}"
az = f"az-{ps_cfg['id'] % 2}"
log.info("Assigned AZ {az}")
ps_cfg["availability_zone"] = az
neon_env_builder.pageserver_config_override = assign_az
neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()
@@ -3030,8 +3031,14 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
assert shards[0]["preferred_az_id"] == expected_az
# When all other schedule scoring parameters are equal, tenants should round-robin on AZs
assert env.storage_controller.tenant_describe(tids[0])["shards"][0]["preferred_az_id"] == "az-1"
assert env.storage_controller.tenant_describe(tids[1])["shards"][0]["preferred_az_id"] == "az-0"
assert env.storage_controller.tenant_describe(tids[2])["shards"][0]["preferred_az_id"] == "az-1"
# Try modifying preferred AZ
updated = env.storage_controller.set_preferred_azs(
{TenantShardId(tid, 0, 0): "foo" for tid in tids}
{TenantShardId(tid, 0, 0): "az-0" for tid in tids}
)
assert set(updated) == set([TenantShardId(tid, 0, 0) for tid in tids])
@@ -3039,29 +3046,24 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
for tid in tids:
shards = env.storage_controller.tenant_describe(tid)["shards"]
assert len(shards) == 1
assert shards[0]["preferred_az_id"] == "foo"
assert shards[0]["preferred_az_id"] == "az-0"
# Generate a layer to avoid shard split handling on ps from tripping
# up on debug assert.
timeline_id = TimelineId.generate()
env.create_timeline("bar", tids[0], timeline_id)
workload = Workload(env, tids[0], timeline_id, branch_name="bar")
workload.init()
workload.write_rows(256)
workload.validate()
# Having modified preferred AZ, we should get moved there
env.storage_controller.reconcile_until_idle(max_interval=0.1)
for tid in tids:
shard = env.storage_controller.tenant_describe(tid)["shards"][0]
attached_to = shard["node_attached"]
attached_in_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == attached_in_az == "az-0"
env.storage_controller.tenant_shard_split(tids[0], shard_count=2)
env.storage_controller.reconcile_until_idle(max_interval=0.1)
shards = env.storage_controller.tenant_describe(tids[0])["shards"]
assert len(shards) == 2
for shard in shards:
attached_to = shard["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
# The scheduling optimization logic is not yet AZ-aware, so doesn't succeed
# in putting the tenant shards in the preferred AZ.
# To be fixed in https://github.com/neondatabase/neon/pull/9916
# assert shard["preferred_az_id"] == expected_az
attached_in_az = env.get_pageserver(attached_to).az_id
assert shard["preferred_az_id"] == attached_in_az == "az-0"
@run_only_on_default_postgres("Postgres version makes no difference here")