Files
neon/storage_controller/src/tenant_shard.rs
Aleksandr Sarantsev 143500dc4f storcon: Improve stably_attached readability (#12249)
## Problem

The `stably_attached` function is hard to read due to deeply nested
conditionals

## Summary of Changes

- Refactored `stably_attached` to use early returns and the `?` operator
for improved readability
2025-06-17 10:10:10 +00:00

3155 lines
127 KiB
Rust

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use futures::future::{self, Either};
use itertools::Itertools;
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, instrument};
use utils::generation::Generation;
use utils::id::NodeId;
use utils::seqwait::{SeqWait, SeqWaitError};
use utils::shard::ShardCount;
use utils::sync::gate::GateGuard;
use crate::compute_hook::ComputeHook;
use crate::metrics::{
self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
};
use crate::node::Node;
use crate::persistence::split_state::SplitState;
use crate::persistence::{Persistence, TenantShardPersistence};
use crate::reconciler::{
ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
attached_location_conf, secondary_location_conf,
};
use crate::scheduler::{
AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
};
use crate::service::ReconcileResultRequest;
use crate::timeline_import::TimelineImportState;
use crate::{Sequence, service};
/// Serialization helper
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
T: std::fmt::Display,
{
serializer.collect_str(
&v.lock()
.unwrap()
.as_ref()
.map(|e| format!("{e}"))
.unwrap_or("".to_string()),
)
}
/// In-memory state for a particular tenant shard.
///
/// This struct implement Serialize for debugging purposes, but is _not_ persisted
/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
#[derive(Serialize)]
pub(crate) struct TenantShard {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) shard: ShardIdentity,
// Runtime only: sequence used to coordinate when updating this object while
// with background reconcilers may be running. A reconciler runs to a particular
// sequence.
pub(crate) sequence: Sequence,
// Latest generation number: next time we attach, increment this
// and use the incremented number when attaching.
//
// None represents an incompletely onboarded tenant via the [`Service::location_config`]
// API, where this tenant may only run in PlacementPolicy::Secondary.
pub(crate) generation: Option<Generation>,
// High level description of how the tenant should be set up. Provided
// externally.
pub(crate) policy: PlacementPolicy,
// Low level description of exactly which pageservers should fulfil
// which role. Generated by `Self::schedule`.
pub(crate) intent: IntentState,
// Low level description of how the tenant is configured on pageservers:
// if this does not match `Self::intent` then the tenant needs reconciliation
// with `Self::reconcile`.
pub(crate) observed: ObservedState,
// Tenant configuration, passed through opaquely to the pageserver. Identical
// for all shards in a tenant.
pub(crate) config: TenantConfig,
/// If a reconcile task is currently in flight, it may be joined here (it is
/// only safe to join if either the result has been received or the reconciler's
/// cancellation token has been fired)
#[serde(skip)]
pub(crate) reconciler: Option<ReconcilerHandle>,
/// If a tenant is being split, then all shards with that TenantId will have a
/// SplitState set, this acts as a guard against other operations such as background
/// reconciliation, and timeline creation.
pub(crate) splitting: SplitState,
/// Flag indicating whether the tenant has an in-progress timeline import.
/// Used to disallow shard splits while an import is in progress.
pub(crate) importing: TimelineImportState,
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
/// is set. This flag is cleared when the tenant is popped off the delay queue.
pub(crate) delayed_reconcile: bool,
/// Optionally wait for reconciliation to complete up to a particular
/// sequence number.
#[serde(skip)]
pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
/// Indicates sequence number for which we have encountered an error reconciling. If
/// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
/// and callers should stop waiting for `waiter` and propagate the error.
#[serde(skip)]
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
/// The most recent error from a reconcile on this tenant. This is a nested Arc
/// because:
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
/// many waiters for one shard, and the underlying error types are not Clone.
///
/// TODO: generalize to an array of recent events
/// TOOD: use a ArcSwap instead of mutex for faster reads?
#[serde(serialize_with = "read_last_error")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool,
/// To do a graceful migration, set this field to the destination pageserver, and optimization
/// functions will consider this node the best location and react appropriately.
preferred_node: Option<NodeId>,
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
// be set to a non-active state to avoid making changes while the issue is fixed.
scheduling_policy: ShardSchedulingPolicy,
}
#[derive(Clone, Debug, Serialize)]
pub(crate) struct IntentState {
attached: Option<NodeId>,
secondary: Vec<NodeId>,
// We should attempt to schedule this shard in the provided AZ to
// decrease chances of cross-AZ compute.
preferred_az_id: Option<AvailabilityZone>,
}
impl IntentState {
pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
Self {
attached: None,
secondary: vec![],
preferred_az_id,
}
}
pub(crate) fn single(
scheduler: &mut Scheduler,
node_id: Option<NodeId>,
preferred_az_id: Option<AvailabilityZone>,
) -> Self {
if let Some(node_id) = node_id {
scheduler.update_node_ref_counts(
node_id,
preferred_az_id.as_ref(),
RefCountUpdate::Attach,
);
}
Self {
attached: node_id,
secondary: vec![],
preferred_az_id,
}
}
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
if self.attached != new_attached {
if let Some(old_attached) = self.attached.take() {
scheduler.update_node_ref_counts(
old_attached,
self.preferred_az_id.as_ref(),
RefCountUpdate::Detach,
);
}
if let Some(new_attached) = &new_attached {
scheduler.update_node_ref_counts(
*new_attached,
self.preferred_az_id.as_ref(),
RefCountUpdate::Attach,
);
}
self.attached = new_attached;
}
if let Some(new_attached) = &new_attached {
assert!(!self.secondary.contains(new_attached));
}
}
/// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));
self.secondary.retain(|n| n != &promote_secondary);
let demoted = self.attached;
self.attached = Some(promote_secondary);
scheduler.update_node_ref_counts(
promote_secondary,
self.preferred_az_id.as_ref(),
RefCountUpdate::PromoteSecondary,
);
if let Some(demoted) = demoted {
scheduler.update_node_ref_counts(
demoted,
self.preferred_az_id.as_ref(),
RefCountUpdate::DemoteAttached,
);
}
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
assert!(!self.secondary.contains(&new_secondary));
assert!(self.attached != Some(new_secondary));
scheduler.update_node_ref_counts(
new_secondary,
self.preferred_az_id.as_ref(),
RefCountUpdate::AddSecondary,
);
self.secondary.push(new_secondary);
}
/// It is legal to call this with a node that is not currently a secondary: that is a no-op
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
let index = self.secondary.iter().position(|n| *n == node_id);
if let Some(index) = index {
scheduler.update_node_ref_counts(
node_id,
self.preferred_az_id.as_ref(),
RefCountUpdate::RemoveSecondary,
);
self.secondary.remove(index);
}
}
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
for secondary in self.secondary.drain(..) {
scheduler.update_node_ref_counts(
secondary,
self.preferred_az_id.as_ref(),
RefCountUpdate::RemoveSecondary,
);
}
}
/// Remove the last secondary node from the list of secondaries
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
if let Some(node_id) = self.secondary.pop() {
scheduler.update_node_ref_counts(
node_id,
self.preferred_az_id.as_ref(),
RefCountUpdate::RemoveSecondary,
);
}
}
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.update_node_ref_counts(
old_attached,
self.preferred_az_id.as_ref(),
RefCountUpdate::Detach,
);
}
self.clear_secondary(scheduler);
}
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
let mut result = Vec::new();
if let Some(p) = self.attached {
result.push(p)
}
result.extend(self.secondary.iter().copied());
result
}
pub(crate) fn get_attached(&self) -> &Option<NodeId> {
&self.attached
}
pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
&self.secondary
}
/// If the node is in use as the attached location, demote it into
/// the list of secondary locations. This is used when a node goes offline,
/// and we want to use a different node for attachment, but not permanently
/// forget the location on the offline node.
///
/// Returns true if a change was made
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
self.attached = None;
self.secondary.push(node_id);
scheduler.update_node_ref_counts(
node_id,
self.preferred_az_id.as_ref(),
RefCountUpdate::DemoteAttached,
);
true
} else {
false
}
}
pub(crate) fn set_preferred_az(
&mut self,
scheduler: &mut Scheduler,
preferred_az: Option<AvailabilityZone>,
) {
let new_az = preferred_az.as_ref();
let old_az = self.preferred_az_id.as_ref();
if old_az != new_az {
if let Some(node_id) = self.attached {
scheduler.update_node_ref_counts(
node_id,
new_az,
RefCountUpdate::ChangePreferredAzFrom(old_az),
);
}
for node_id in &self.secondary {
scheduler.update_node_ref_counts(
*node_id,
new_az,
RefCountUpdate::ChangePreferredAzFrom(old_az),
);
}
self.preferred_az_id = preferred_az;
}
}
pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
self.preferred_az_id.as_ref()
}
}
impl Drop for IntentState {
fn drop(&mut self) {
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
// We do not check this while panicking, to avoid polluting unit test failures or
// other assertions with this assertion's output. It's still wrong to leak these,
// but if we already have a panic then we don't need to independently flag this case.
if !(std::thread::panicking()) {
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
}
}
}
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
pub(crate) struct ObservedState {
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
}
/// Our latest knowledge of how this tenant is configured in the outside world.
///
/// Meaning:
/// * No instance of this type exists for a node: we are certain that we have nothing configured on that
/// node for this shard.
/// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
/// what it is (e.g. we failed partway through configuring it)
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
/// and that configuration will still be present unless something external interfered.
#[derive(Clone, Serialize, Deserialize, Debug)]
pub(crate) struct ObservedStateLocation {
/// If None, it means we do not know the status of this shard's location on this node, but
/// we know that we might have some state on this node.
pub(crate) conf: Option<LocationConfig>,
}
pub(crate) struct ReconcilerWaiter {
// For observability purposes, remember the ID of the shard we're
// waiting for.
pub(crate) tenant_shard_id: TenantShardId,
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
seq: Sequence,
}
pub(crate) enum ReconcilerStatus {
Done,
Failed,
InProgress,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ReconcileWaitError {
#[error("Timeout waiting for shard {0}")]
Timeout(TenantShardId),
#[error("shutting down")]
Shutdown,
#[error("Reconcile error on shard {0}: {1}")]
Failed(TenantShardId, Arc<ReconcileError>),
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) struct ReplaceSecondary {
old_node_id: NodeId,
new_node_id: NodeId,
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) struct MigrateAttachment {
pub(crate) old_attached_node_id: NodeId,
pub(crate) new_attached_node_id: NodeId,
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) enum ScheduleOptimizationAction {
// Replace one of our secondary locations with a different node
ReplaceSecondary(ReplaceSecondary),
// Migrate attachment to an existing secondary location
MigrateAttachment(MigrateAttachment),
// Create a secondary location, with the intent of later migrating to it
CreateSecondary(NodeId),
// Remove a secondary location that we previously created to facilitate a migration
RemoveSecondary(NodeId),
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) struct ScheduleOptimization {
// What was the reconcile sequence when we generated this optimization? The optimization
// should only be applied if the shard's sequence is still at this value, in case other changes
// happened between planning the optimization and applying it.
sequence: Sequence,
pub(crate) action: ScheduleOptimizationAction,
}
impl ReconcilerWaiter {
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
tokio::select! {
result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
result.map_err(|e| match e {
SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
})?;
},
result = self.error_seq_wait.wait_for(self.seq) => {
result.map_err(|e| match e {
SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
SeqWaitError::Timeout => unreachable!()
})?;
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
}
}
Ok(())
}
pub(crate) fn get_status(&self) -> ReconcilerStatus {
if self.seq_wait.would_wait_for(self.seq).is_ok() {
ReconcilerStatus::Done
} else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
ReconcilerStatus::Failed
} else {
ReconcilerStatus::InProgress
}
}
}
/// Having spawned a reconciler task, the tenant shard's state will carry enough
/// information to optionally cancel & await it later.
pub(crate) struct ReconcilerHandle {
sequence: Sequence,
handle: JoinHandle<()>,
cancel: CancellationToken,
}
pub(crate) enum ReconcileNeeded {
/// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
/// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
No,
/// shard has a reconciler running, and its intent hasn't changed since that one was
/// spawned: wait for the existing reconciler rather than spawning a new one.
WaitExisting(ReconcilerWaiter),
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
Yes(ReconcileReason),
}
#[derive(Debug)]
pub(crate) enum ReconcileReason {
ActiveNodesDirty,
UnknownLocation,
PendingComputeNotification,
}
/// Pending modification to the observed state of a tenant shard.
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
pub(crate) enum ObservedStateDelta {
Upsert(Box<(NodeId, ObservedStateLocation)>),
Delete(NodeId),
}
impl ObservedStateDelta {
pub(crate) fn node_id(&self) -> &NodeId {
match self {
Self::Upsert(up) => &up.0,
Self::Delete(nid) => nid,
}
}
}
/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
pub(crate) sequence: Sequence,
/// On errors, `observed` should be treated as an incompleted description
/// of state (i.e. any nodes present in the result should override nodes
/// present in the parent tenant state, but any unmentioned nodes should
/// not be removed from parent tenant state)
pub(crate) result: Result<(), ReconcileError>,
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) generation: Option<Generation>,
pub(crate) observed_deltas: Vec<ObservedStateDelta>,
/// Set [`TenantShard::pending_compute_notification`] from this flag
pub(crate) pending_compute_notification: bool,
}
impl ObservedState {
pub(crate) fn new() -> Self {
Self {
locations: HashMap::new(),
}
}
pub(crate) fn is_empty(&self) -> bool {
self.locations.is_empty()
}
}
impl TenantShard {
pub(crate) fn new(
tenant_shard_id: TenantShardId,
shard: ShardIdentity,
policy: PlacementPolicy,
preferred_az_id: Option<AvailabilityZone>,
) -> Self {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_tenant_shards
.inc();
Self {
tenant_shard_id,
policy,
intent: IntentState::new(preferred_az_id),
generation: Some(Generation::new(0)),
shard,
observed: ObservedState::default(),
config: TenantConfig::default(),
reconciler: None,
splitting: SplitState::Idle,
importing: TimelineImportState::Idle,
sequence: Sequence(1),
delayed_reconcile: false,
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_node: None,
}
}
/// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
/// in a way that makes use of any configured locations that already exist in the outside world.
pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
// Choose an attached location by filtering observed locations, and then sorting to get the highest
// generation
let mut attached_locs = self
.observed
.locations
.iter()
.filter_map(|(node_id, l)| {
if let Some(conf) = &l.conf {
if conf.mode == LocationConfigMode::AttachedMulti
|| conf.mode == LocationConfigMode::AttachedSingle
|| conf.mode == LocationConfigMode::AttachedStale
{
Some((node_id, conf.generation))
} else {
None
}
} else {
None
}
})
.collect::<Vec<_>>();
attached_locs.sort_by_key(|i| i.1);
if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
self.intent.set_attached(scheduler, Some(*node_id));
}
// All remaining observed locations generate secondary intents. This includes None
// observations, as these may well have some local content on disk that is usable (this
// is an edge case that might occur if we restarted during a migration or other change)
//
// We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
// will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.push_secondary(scheduler, *node_id);
}
});
}
/// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
/// attached pageserver for a shard.
///
/// Returns whether we modified it, and the NodeId selected.
fn schedule_attached(
&mut self,
scheduler: &mut Scheduler,
context: &ScheduleContext,
) -> Result<(bool, NodeId), ScheduleError> {
// No work to do if we already have an attached tenant
if let Some(node_id) = self.intent.attached {
return Ok((false, node_id));
}
if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
// Promote a secondary
tracing::debug!("Promoted secondary {} to attached", promote_secondary);
self.intent.promote_attached(scheduler, promote_secondary);
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
&self.intent.secondary,
&self.intent.preferred_az_id,
context,
)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
}
}
#[instrument(skip_all, fields(
tenant_id=%self.tenant_shard_id.tenant_id,
shard_id=%self.tenant_shard_id.shard_slug(),
sequence=%self.sequence
))]
pub(crate) fn schedule(
&mut self,
scheduler: &mut Scheduler,
context: &mut ScheduleContext,
) -> Result<(), ScheduleError> {
let r = self.do_schedule(scheduler, context);
context.avoid(&self.intent.all_pageservers());
r
}
pub(crate) fn do_schedule(
&mut self,
scheduler: &mut Scheduler,
context: &ScheduleContext,
) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
// pageservers if so.
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
// change their attach location.
match self.scheduling_policy {
ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
// Warn to make it obvious why other things aren't happening/working, if we skip scheduling
tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
"Scheduling is disabled by policy {:?}", self.scheduling_policy);
return Ok(());
}
}
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut modified = false;
// Add/remove nodes to fulfil policy
use PlacementPolicy::*;
match self.policy {
Attached(secondary_count) => {
// Should have exactly one attached, and at least N secondaries
let (modified_attached, attached_node_id) =
self.schedule_attached(scheduler, context)?;
modified |= modified_attached;
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
&used_pageservers,
&self.intent.preferred_az_id,
context,
)?;
self.intent.push_secondary(scheduler, node_id);
used_pageservers.push(node_id);
modified = true;
}
}
Secondary => {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
//
// We use [`AttachedShardTag`] because when a secondary location is the only one
// a shard has, we expect that its next use will be as an attached location: we want
// the tenant to be ready to warm up and run fast in their preferred AZ.
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
&[],
&self.intent.preferred_az_id,
context,
)?;
self.intent.push_secondary(scheduler, node_id);
modified = true;
}
while self.intent.secondary.len() > 1 {
// If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
// having just demoted our attached location), then we should prefer to keep the location
// in our preferred AZ. Tenants in Secondary mode want to be in the preferred AZ so that
// they have a warm location to become attached when transitioning back into Attached.
let mut candidates = self.intent.get_secondary().clone();
// Sort to get secondaries outside preferred AZ last
candidates
.sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
let secondary_to_remove = candidates.pop().unwrap();
self.intent.remove_secondary(scheduler, secondary_to_remove);
modified = true;
}
}
Detached => {
// Never add locations in this mode
if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
self.intent.clear(scheduler);
modified = true;
}
}
}
if modified {
self.sequence.0 += 1;
}
Ok(())
}
/// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
/// if the swap is not possible and leaves the intent state in its original state.
///
/// Arguments:
/// `attached_to`: the currently attached location matching the intent state (may be None if the
/// shard is not attached)
/// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
/// the scheduler to recommend a node
pub(crate) fn reschedule_to_secondary(
&mut self,
promote_to: Option<NodeId>,
scheduler: &mut Scheduler,
) -> Result<(), ScheduleError> {
let promote_to = match promote_to {
Some(node) => node,
None => match self.preferred_secondary(scheduler) {
Some(node) => node,
None => {
return Err(ScheduleError::ImpossibleConstraint);
}
},
};
assert!(self.intent.get_secondary().contains(&promote_to));
if let Some(node) = self.intent.get_attached() {
let demoted = self.intent.demote_attached(scheduler, *node);
if !demoted {
return Err(ScheduleError::ImpossibleConstraint);
}
}
self.intent.promote_attached(scheduler, promote_to);
// Increment the sequence number for the edge case where a
// reconciler is already running to avoid waiting on the
// current reconcile instead of spawning a new one.
self.sequence = self.sequence.next();
Ok(())
}
/// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
fn is_better_location<T: ShardTag>(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
current: NodeId,
candidate: NodeId,
) -> Option<bool> {
let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
candidate,
&self.intent.preferred_az_id,
schedule_context,
) else {
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
return None;
};
// If the candidate is our preferred node, then it is better than the current location, as long
// as it is online -- the online check is part of the score calculation we did above, so it's
// important that this check comes after that one.
if let Some(preferred) = self.preferred_node.as_ref() {
if preferred == &candidate {
return Some(true);
}
}
match scheduler.compute_node_score::<T::Score>(
current,
&self.intent.preferred_az_id,
schedule_context,
) {
Some(current_score) => {
// Ignore utilization components when comparing scores: we don't want to migrate
// because of transient load variations, it risks making the system thrash, and
// migrating for utilization requires a separate high level view of the system to
// e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
// moving things around in the order that we hit this function.
let candidate_score = candidate_score.for_optimization();
let current_score = current_score.for_optimization();
if candidate_score < current_score {
tracing::info!(
"Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
);
Some(true)
} else {
// The candidate node is no better than our current location, so don't migrate
tracing::debug!(
"Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
);
Some(false)
}
}
None => {
// The current node is unavailable for scheduling, so we can't make any sensible
// decisions about optimisation. This should be a transient state -- if the node
// is offline then it will get evacuated, if is blocked by a scheduling mode
// then we will respect that mode by doing nothing.
tracing::debug!("Current node {current} is unavailable for scheduling");
None
}
}
}
pub(crate) fn find_better_location<T: ShardTag>(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
current: NodeId,
hard_exclude: &[NodeId],
) -> Option<NodeId> {
// If we have a migration hint, then that is our better location
if let Some(hint) = self.preferred_node.as_ref() {
if hint == &current {
return None;
}
return Some(*hint);
}
// Look for a lower-scoring location to attach to
let Ok(candidate_node) = scheduler.schedule_shard::<T>(
hard_exclude,
&self.intent.preferred_az_id,
schedule_context,
) else {
// A scheduling error means we have no possible candidate replacements
tracing::debug!("No candidate node found");
return None;
};
if candidate_node == current {
// We're already at the best possible location, so don't migrate
tracing::debug!("Candidate node {candidate_node} is already in use");
return None;
}
self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
.and_then(|better| if better { Some(candidate_node) } else { None })
}
/// This function is an optimization, used to avoid doing large numbers of scheduling operations
/// when looking for optimizations. This function uses knowledge of how scores work to do some
/// fast checks for whether it may to be possible to improve a score.
///
/// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
/// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
/// work.
pub(crate) fn maybe_optimizable(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> bool {
// Tenant with preferred node: check if it is not already at the preferred node
if let Some(preferred) = self.preferred_node.as_ref() {
if Some(preferred) != self.intent.get_attached().as_ref() {
return true;
}
}
// Sharded tenant: check if any locations have a nonzero affinity score
if self.shard.count >= ShardCount(1) {
let schedule_context = schedule_context.project_detach(self);
for node in self.intent.all_pageservers() {
if let Some(af) = schedule_context.nodes.get(&node) {
if *af > AffinityScore(0) {
return true;
}
}
}
}
// Attached tenant: check if the attachment is outside the preferred AZ
if let PlacementPolicy::Attached(_) = self.policy {
if let Some(attached) = self.intent.get_attached() {
if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
return true;
}
}
}
// Tenant with secondary locations: check if any are within the preferred AZ
for secondary in self.intent.get_secondary() {
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
return true;
}
}
// Does the tenant have excess secondaries?
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
return true;
}
// Fall through: no optimizations possible
false
}
/// Optimize attachments: if a shard has a secondary location that is preferable to
/// its primary location based on soft constraints, switch that secondary location
/// to be attached.
///
/// `schedule_context` should have been populated with all shards in the tenant, including
/// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_attachment(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
let attached = (*self.intent.get_attached())?;
let schedule_context = schedule_context.project_detach(self);
// If we already have a secondary that is higher-scoring than out current location,
// then simply migrate to it.
for secondary in self.intent.get_secondary() {
if let Some(true) = self.is_better_location::<AttachedShardTag>(
scheduler,
&schedule_context,
attached,
*secondary,
) {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *secondary,
}),
});
}
}
// Given that none of our current secondaries is a better location than our current
// attached location (checked above), we may trim any secondaries that are not needed
// for the placement policy.
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// This code path cleans up extra secondaries after migrating, and/or
// trims extra secondaries after a PlacementPolicy::Attached(N) was
// modified to decrease N.
let secondary_scores = self
.intent
.get_secondary()
.iter()
.map(|node_id| {
(
*node_id,
scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
*node_id,
&self.intent.preferred_az_id,
&schedule_context,
),
)
})
.collect::<HashMap<_, _>>();
if secondary_scores.iter().any(|score| score.1.is_none()) {
// Trivial case: if we only have one secondary, drop that one
if self.intent.get_secondary().len() == 1 {
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(
*self.intent.get_secondary().first().unwrap(),
),
});
}
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
// where its score can't be calculated), and drop the others. This enables us to make progress in
// most cases, even if some nodes are offline or have scheduling=pause set.
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
let is_available = secondary_scores
.get(n)
.expect("Built from same list of nodes")
.is_some();
is_available && !in_home_az
}) {
// Great, we found one to retain. Pick some other to drop.
if let Some(victim) = self
.intent
.get_secondary()
.iter()
.find(|n| n != &retain_secondary)
{
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
});
}
}
// Fall through: we didn't identify one to remove. This ought to be rare.
tracing::warn!(
"Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
self.intent.get_secondary()
);
} else {
let victim = secondary_scores
.iter()
.max_by_key(|score| score.1.unwrap())
.unwrap()
.0;
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
});
}
}
let replacement = self.find_better_location::<AttachedShardTag>(
scheduler,
&schedule_context,
attached,
&[], // Don't exclude secondaries: our preferred attachment location may be a secondary
);
// We have found a candidate and confirmed that its score is preferable
// to our current location. See if we have a secondary location in the preferred location already: if not,
// then create one.
if let Some(replacement) = replacement {
// If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
// not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
// multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
// AZ: skip this optimization if it is not in our final, preferred AZ.
//
// This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
// there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
if self.preferred_node.is_none()
&& self.intent.preferred_az_id.is_some()
&& scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
{
tracing::debug!(
"Candidate node {replacement} is not in preferred AZ {:?}",
self.intent.preferred_az_id
);
// This should only happen if our current location is not in the preferred AZ, otherwise
// [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
// AZ is the highest priority part of NodeAttachmentSchedulingScore.
debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
return None;
}
if !self.intent.get_secondary().contains(&replacement) {
Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::CreateSecondary(replacement),
})
} else {
// We already have a secondary in the preferred location, let's try migrating to it. Our caller
// will check the warmth of the destination before deciding whether to really execute this.
Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: replacement,
}),
})
}
} else {
// We didn't find somewhere we'd rather be, and we don't have any excess secondaries
// to clean up: no action required.
None
}
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_secondary(
&self,
scheduler: &mut Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
// We have extra secondaries, perhaps to facilitate a migration of the attached location:
// do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
// and we are called again, we will proceed.
tracing::debug!("Too many secondaries: skipping");
return None;
}
let schedule_context = schedule_context.project_detach(self);
for secondary in self.intent.get_secondary() {
// Make sure we don't try to migrate a secondary to our attached location: this case happens
// easily in environments without multiple AZs.
let mut exclude = match self.intent.attached {
Some(attached) => vec![attached],
None => vec![],
};
// Exclude all other secondaries from the scheduling process to avoid replacing
// one existing secondary with another existing secondary.
for another_secondary in self.intent.secondary.iter() {
if another_secondary != secondary {
exclude.push(*another_secondary);
}
}
let replacement = match &self.policy {
PlacementPolicy::Attached(_) => {
// Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
// to avoid placing them in the preferred AZ.
self.find_better_location::<SecondaryShardTag>(
scheduler,
&schedule_context,
*secondary,
&exclude,
)
}
PlacementPolicy::Secondary => {
// In secondary-only mode, we want our secondary locations in the preferred AZ,
// so that they're ready to take over as an attached location when we transition
// into PlacementPolicy::Attached.
self.find_better_location::<AttachedShardTag>(
scheduler,
&schedule_context,
*secondary,
&exclude,
)
}
PlacementPolicy::Detached => None,
};
assert!(replacement != Some(*secondary));
if let Some(replacement) = replacement {
// We have found a candidate and confirmed that its score is preferable
// to our current location. See if we have a secondary location in the preferred location already: if not,
// then create one.
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: replacement,
}),
});
}
}
None
}
/// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
/// other optimisation functions, to bias them to move to the destination node.
pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
if let Some(hint) = self.preferred_node.as_ref() {
if Some(hint) != node.as_ref() {
// This is legal but a bit surprising: we expect that administrators wouldn't usually
// change their mind about where to migrate something.
tracing::warn!(
"Changing migration destination from {hint} to {node:?} (current intent {:?})",
self.intent
);
}
}
self.preferred_node = node;
}
pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
self.preferred_node
}
/// Return true if the optimization was really applied: it will not be applied if the optimization's
/// sequence is behind this tenant shard's
pub(crate) fn apply_optimization(
&mut self,
scheduler: &mut Scheduler,
optimization: ScheduleOptimization,
) -> bool {
if optimization.sequence != self.sequence {
return false;
}
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_schedule_optimization
.inc();
match optimization.action {
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.demote_attached(scheduler, old_attached_node_id);
self.intent
.promote_attached(scheduler, new_attached_node_id);
if let Some(hint) = self.preferred_node.as_ref() {
if hint == &new_attached_node_id {
// The migration target is not a long term pin: once we are done with the migration, clear it.
tracing::info!("Graceful migration to {hint} complete");
self.preferred_node = None;
}
}
}
ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id,
new_node_id,
}) => {
self.intent.remove_secondary(scheduler, old_node_id);
self.intent.push_secondary(scheduler, new_node_id);
}
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
self.intent.push_secondary(scheduler, new_node_id);
}
ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
self.intent.remove_secondary(scheduler, old_secondary);
}
}
true
}
/// When a shard has several secondary locations, we need to pick one in situations where
/// we promote one of them to an attached location:
/// - When draining a node for restart
/// - When responding to a node failure
///
/// In this context, 'preferred' does not mean the node with the best scheduling score: instead
/// we want to pick the node which is best for use _temporarily_ while the previous attached location
/// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
/// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
/// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
/// they're probably not warmed up yet). The latter behavior is based oni
///
/// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
/// caller needs to a pick a node some other way.
pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
// We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
// to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
// rather than a short-lived secondary location being used for optimization/migration (which would have
// been scheduled in our preferred AZ).
let mut candidates = candidates
.iter()
.map(|(node_id, node_az)| {
if node_az == &self.intent.preferred_az_id {
(1, *node_id)
} else {
(0, *node_id)
}
})
.collect::<Vec<_>>();
candidates.sort();
candidates.first().map(|i| i.1)
}
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
///
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
/// funciton should not be used to decide whether to reconcile.
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
// We have an intent to attach for this node
let attach_intent = self.intent.attached?;
// We have an observed state for this node
let location = self.observed.locations.get(&attach_intent)?;
// Our observed state is not None, i.e. not in flux
let location_config = location.conf.as_ref()?;
// Check if our intent and observed state agree that this node is in an attached state.
match location_config.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => Some(attach_intent),
_ => None,
}
}
fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
let mut dirty_nodes = HashSet::new();
if let Some(node_id) = self.intent.attached {
// Maybe panic: it is a severe bug if we try to attach while generation is null.
let generation = self
.generation
.expect("Attempted to enter attached state without a generation");
let wanted_conf =
attached_location_conf(generation, &self.shard, &self.config, &self.policy);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
dirty_nodes.insert(node_id);
}
}
}
for node_id in &self.intent.secondary {
let wanted_conf = secondary_location_conf(&self.shard, &self.config);
match self.observed.locations.get(node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
dirty_nodes.insert(*node_id);
}
}
}
for node_id in self.observed.locations.keys() {
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
// We have observed state that isn't part of our intent: need to clean it up.
dirty_nodes.insert(*node_id);
}
}
dirty_nodes.retain(|node_id| {
nodes
.get(node_id)
.map(|n| n.is_available())
.unwrap_or(false)
});
!dirty_nodes.is_empty()
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn get_reconcile_needed(
&mut self,
pageservers: &Arc<HashMap<NodeId, Node>>,
) -> ReconcileNeeded {
// If there are any ambiguous observed states, and the nodes they refer to are available,
// we should reconcile to clean them up.
let mut dirty_observed = false;
for (node_id, observed_loc) in &self.observed.locations {
let node = pageservers
.get(node_id)
.expect("Nodes may not be removed while referenced");
if observed_loc.conf.is_none() && node.is_available() {
dirty_observed = true;
break;
}
}
let active_nodes_dirty = self.dirty(pageservers);
let reconcile_needed = match (
active_nodes_dirty,
dirty_observed,
self.pending_compute_notification,
) {
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
_ => ReconcileNeeded::No,
};
if matches!(reconcile_needed, ReconcileNeeded::No) {
tracing::debug!("Not dirty, no reconciliation needed.");
return ReconcileNeeded::No;
}
// If we are currently splitting, then never start a reconciler task: the splitting logic
// requires that shards are not interfered with while it runs. Do this check here rather than
// up top, so that we only log this message if we would otherwise have done a reconciliation.
if !matches!(self.splitting, SplitState::Idle) {
tracing::info!("Refusing to reconcile, splitting in progress");
return ReconcileNeeded::No;
}
// Reconcile already in flight for the current sequence?
if let Some(handle) = &self.reconciler {
if handle.sequence == self.sequence {
tracing::info!(
"Reconciliation already in progress for sequence {:?}",
self.sequence,
);
return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
});
}
}
// Pre-checks done: finally check whether we may actually do the work
match self.scheduling_policy {
ShardSchedulingPolicy::Active
| ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause => {}
ShardSchedulingPolicy::Stop => {
// We only reach this point if there is work to do and we're going to skip
// doing it: warn it obvious why this tenant isn't doing what it ought to.
tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
return ReconcileNeeded::No;
}
}
reconcile_needed
}
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
/// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
///
/// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
/// that the waiter will not complete until some future Reconciler is constructed and run.
fn ensure_sequence_ahead(&mut self) {
// Find the highest sequence for which a Reconciler has previously run or is currently
// running
let max_seen = std::cmp::max(
self.reconciler
.as_ref()
.map(|r| r.sequence)
.unwrap_or(Sequence(0)),
std::cmp::max(self.waiter.load(), self.error_waiter.load()),
);
if self.sequence <= max_seen {
self.sequence = max_seen.next();
}
}
/// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
///
/// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
/// you would like to wait on the next reconciler that gets spawned in the background.
pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
self.ensure_sequence_ahead();
ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
}
}
async fn reconcile(
sequence: Sequence,
mut reconciler: Reconciler,
must_notify: bool,
) -> ReconcileResult {
// Attempt to make observed state match intent state
let result = reconciler.reconcile().await;
// If we know we had a pending compute notification from some previous action, send a notification irrespective
// of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
// sending a notification of a location that isn't really attached.
if result.is_ok() && must_notify {
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
reconciler.compute_notify().await.ok();
} else if must_notify {
// Carry this flag so that the reconciler's result will indicate that it still needs to retry
// the compute hook notification eventually.
reconciler.compute_notify_failure = true;
}
// Update result counter
let outcome_label = match &result {
Ok(_) => ReconcileOutcome::Success,
Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
Err(_) => ReconcileOutcome::Error,
};
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_complete
.inc(ReconcileCompleteLabelGroup {
status: outcome_label,
});
// Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
// try and schedule more work in response to our result.
ReconcileResult {
sequence,
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed_deltas: reconciler.observed_deltas(),
pending_compute_notification: reconciler.compute_notify_failure,
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
reason: ReconcileReason,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
reconciler_config: ReconcilerConfig,
service_config: &service::Config,
persistence: &Arc<Persistence>,
units: ReconcileUnits,
gate_guard: GateGuard,
cancel: &CancellationToken,
http_client: reqwest::Client,
) -> Option<ReconcilerWaiter> {
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
let old_handle = self.reconciler.take();
// Build list of nodes from which the reconciler should detach
let mut detach = Vec::new();
for node_id in self.observed.locations.keys() {
if self.intent.get_attached() != &Some(*node_id)
&& !self.intent.secondary.contains(node_id)
{
detach.push(
pageservers
.get(node_id)
.expect("Intent references non-existent pageserver")
.clone(),
)
}
}
// Advance the sequence before spawning a reconciler, so that sequence waiters
// can distinguish between before+after the reconcile completes.
self.ensure_sequence_ahead();
let reconciler_cancel = cancel.child_token();
let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
let reconciler = Reconciler {
tenant_shard_id: self.tenant_shard_id,
shard: self.shard,
placement_policy: self.policy.clone(),
generation: self.generation,
intent: reconciler_intent,
detach,
reconciler_config,
config: self.config.clone(),
preferred_az: self.intent.preferred_az_id.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
_resource_units: units,
cancel: reconciler_cancel.clone(),
persistence: persistence.clone(),
compute_notify_failure: false,
http_client,
};
let reconcile_seq = self.sequence;
let long_reconcile_threshold = service_config.long_reconcile_threshold;
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
let must_notify = self.pending_compute_notification;
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
tenant_id=%reconciler.tenant_shard_id.tenant_id,
shard_id=%reconciler.tenant_shard_id.shard_slug());
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_spawn
.inc();
let result_tx = result_tx.clone();
let join_handle = tokio::task::spawn(
async move {
// Wait for any previous reconcile task to complete before we start
if let Some(old_handle) = old_handle {
old_handle.cancel.cancel();
if let Err(e) = old_handle.handle.await {
// We can't do much with this other than log it: the task is done, so
// we may proceed with our work.
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
}
}
// Early check for cancellation before doing any work
// TODO: wrap all remote API operations in cancellation check
// as well.
if reconciler.cancel.is_cancelled() {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_complete
.inc(ReconcileCompleteLabelGroup {
status: ReconcileOutcome::Cancel,
});
return;
}
let (tenant_id_label, shard_number_label, sequence_label) = {
(
reconciler.tenant_shard_id.tenant_id.to_string(),
reconciler.tenant_shard_id.shard_number.0.to_string(),
reconcile_seq.to_string(),
)
};
let label_group = ReconcileLongRunningLabelGroup {
tenant_id: &tenant_id_label,
shard_number: &shard_number_label,
sequence: &sequence_label,
};
let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
let long_reconcile_fut = {
let label_group = label_group.clone();
async move {
tokio::time::sleep(long_reconcile_threshold).await;
tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_long_running
.inc(label_group);
}
};
let reconcile_fut = std::pin::pin!(reconcile_fut);
let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
let (was_long, result) =
match future::select(reconcile_fut, long_reconcile_fut).await {
Either::Left((reconcile_result, _)) => (false, reconcile_result),
Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
};
if was_long {
let id = metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_long_running
.with_labels(label_group);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_long_running
.remove_metric(id);
}
result_tx
.send(ReconcileResultRequest::ReconcileResult(result))
.ok();
}
.instrument(reconciler_span),
);
self.reconciler = Some(ReconcilerHandle {
sequence: self.sequence,
handle: join_handle,
cancel: reconciler_cancel,
});
Some(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
})
}
pub(crate) fn cancel_reconciler(&self) {
if let Some(handle) = self.reconciler.as_ref() {
handle.cancel.cancel()
}
}
/// Get a waiter for any reconciliation in flight, but do not start reconciliation
/// if it is not already running
pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
if self.reconciler.is_some() {
Some(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
})
} else {
None
}
}
/// Called when a ReconcileResult has been emitted and the service is updating
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
/// the handle to indicate there is no longer a reconciliation in progress.
pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
if let Some(reconcile_handle) = &self.reconciler {
if reconcile_handle.sequence <= sequence {
self.reconciler = None;
}
}
}
/// If we had any state at all referring to this node ID, drop it. Does not
/// attempt to reschedule.
///
/// Returns true if we modified the node's intent state.
pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
let mut intent_modified = false;
// Drop if this node was our attached intent
if self.intent.attached == Some(node_id) {
self.intent.attached = None;
intent_modified = true;
}
// Drop from the list of secondaries, and check if we modified it
let had_secondaries = self.intent.secondary.len();
self.intent.secondary.retain(|n| n != &node_id);
intent_modified |= self.intent.secondary.len() != had_secondaries;
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
if self.preferred_node == Some(node_id) {
self.preferred_node = None;
}
intent_modified
}
pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
self.scheduling_policy = p;
}
pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
self.scheduling_policy
}
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
// Ordering: always set last_error before advancing sequence, so that sequence
// waiters are guaranteed to see a Some value when they see an error.
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
self.error_waiter.advance(sequence);
}
pub(crate) fn from_persistent(
tsp: TenantShardPersistence,
intent: IntentState,
) -> anyhow::Result<Self> {
let tenant_shard_id = tsp.get_tenant_shard_id()?;
let shard_identity = tsp.get_shard_identity()?;
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_tenant_shards
.inc();
Ok(Self {
tenant_shard_id,
shard: shard_identity,
sequence: Sequence::initial(),
generation: tsp.generation.map(|g| Generation::new(g as u32)),
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
intent,
observed: ObservedState::new(),
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
splitting: tsp.splitting,
// Filled in during [`Service::startup_reconcile`]
importing: TimelineImportState::Idle,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
preferred_node: None,
})
}
pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
TenantShardPersistence {
tenant_id: self.tenant_shard_id.tenant_id.to_string(),
shard_number: self.tenant_shard_id.shard_number.0 as i32,
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
}
}
pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
self.intent.get_preferred_az()
}
pub(crate) fn set_preferred_az(
&mut self,
scheduler: &mut Scheduler,
preferred_az_id: Option<AvailabilityZone>,
) {
self.intent.set_preferred_az(scheduler, preferred_az_id);
}
/// Returns all the nodes to which this tenant shard is attached according to the
/// observed state and the generations. Return vector is sorted from latest generation
/// to earliest.
pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
self.observed
.locations
.iter()
.filter_map(|(node_id, observed)| {
use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
let conf = observed.conf.as_ref()?;
match (conf.generation, conf.mode) {
(Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
Some((*node_id, gen_))
}
_ => None,
}
})
.sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
lhs_gen.cmp(rhs_gen).reverse()
})
.map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
.collect()
}
/// Update the observed state of the tenant by applying incremental deltas
///
/// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
/// They are then filtered in [`crate::service::Service::process_result`].
pub(crate) fn apply_observed_deltas(
&mut self,
deltas: impl Iterator<Item = ObservedStateDelta>,
) {
for delta in deltas {
match delta {
ObservedStateDelta::Upsert(ups) => {
let (node_id, loc) = *ups;
// If the generation of the observed location in the delta is lagging
// behind the current one, then we have a race condition and cannot
// be certain about the true observed state. Set the observed state
// to None in order to reflect this.
let crnt_gen = self
.observed
.locations
.get(&node_id)
.and_then(|loc| loc.conf.as_ref())
.and_then(|conf| conf.generation);
let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
match (crnt_gen, new_gen) {
(Some(crnt), Some(new)) if crnt_gen > new_gen => {
tracing::warn!(
"Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
node_id,
loc,
crnt,
new
);
self.observed
.locations
.insert(node_id, ObservedStateLocation { conf: None });
continue;
}
_ => {}
}
if let Some(conf) = &loc.conf {
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
} else {
tracing::info!("Setting observed location {} to None", node_id,)
}
self.observed.locations.insert(node_id, loc);
}
ObservedStateDelta::Delete(node_id) => {
tracing::info!("Deleting observed location {}", node_id);
self.observed.locations.remove(&node_id);
}
}
}
}
/// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
///
/// If the shard does not have a preferred AZ, returns false.
pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
self.intent
.get_attached()
.map(|node_id| {
Some(
nodes
.get(&node_id)
.expect("referenced node exists")
.get_availability_zone_id(),
) != self.intent.preferred_az_id.as_ref()
})
.unwrap_or(false)
}
}
impl Drop for TenantShard {
fn drop(&mut self) {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_tenant_shards
.dec();
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::cell::RefCell;
use std::rc::Rc;
use pageserver_api::controller_api::NodeAvailability;
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
use rand::SeedableRng;
use rand::rngs::StdRng;
use utils::id::TenantId;
use super::*;
use crate::scheduler::test_utils::make_test_nodes;
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
let tenant_id = TenantId::generate();
let shard_number = ShardNumber(0);
let shard_count = ShardCount::new(1);
let stripe_size = DEFAULT_STRIPE_SIZE;
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number,
shard_count,
};
TenantShard::new(
tenant_shard_id,
ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
policy,
None,
)
}
pub(crate) fn make_test_tenant(
policy: PlacementPolicy,
shard_count: ShardCount,
preferred_az: Option<AvailabilityZone>,
) -> Vec<TenantShard> {
make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
}
pub(crate) fn make_test_tenant_with_id(
tenant_id: TenantId,
policy: PlacementPolicy,
shard_count: ShardCount,
preferred_az: Option<AvailabilityZone>,
) -> Vec<TenantShard> {
let stripe_size = DEFAULT_STRIPE_SIZE;
(0..shard_count.count())
.map(|i| {
let shard_number = ShardNumber(i);
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number,
shard_count,
};
TenantShard::new(
tenant_shard_id,
ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
policy.clone(),
preferred_az.clone(),
)
})
.collect()
}
/// Test the scheduling behaviors used when a tenant configured for HA is subject
/// to nodes being marked offline.
#[test]
fn tenant_ha_scheduling() -> anyhow::Result<()> {
// Start with three nodes. Our tenant will only use two. The third one is
// expected to remain unused.
let mut nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut context = ScheduleContext::default();
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
tenant_shard
.schedule(&mut scheduler, &mut context)
.expect("we have enough nodes, scheduling should work");
// Expect to initially be schedule on to different nodes
assert_eq!(tenant_shard.intent.secondary.len(), 1);
assert!(tenant_shard.intent.attached.is_some());
let attached_node_id = tenant_shard.intent.attached.unwrap();
let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_shard
.intent
.demote_attached(&mut scheduler, attached_node_id);
assert!(changed);
assert!(tenant_shard.intent.attached.is_none());
assert_eq!(tenant_shard.intent.secondary.len(), 2);
// Update the scheduler state to indicate the node is offline
nodes
.get_mut(&attached_node_id)
.unwrap()
.set_availability(NodeAvailability::Offline);
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
// Scheduling the node should promote the still-available secondary node to attached
tenant_shard
.schedule(&mut scheduler, &mut context)
.expect("active nodes are available");
assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
// The original attached node should have been retained as a secondary
assert_eq!(
*tenant_shard.intent.secondary.iter().last().unwrap(),
attached_node_id
);
tenant_shard.intent.clear(&mut scheduler);
Ok(())
}
#[test]
fn intent_from_observed() -> anyhow::Result<()> {
let nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
tenant_shard.observed.locations.insert(
NodeId(3),
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedMulti,
generation: Some(2),
secondary_conf: None,
shard_number: tenant_shard.shard.number.0,
shard_count: tenant_shard.shard.count.literal(),
shard_stripe_size: tenant_shard.shard.stripe_size.0,
tenant_conf: TenantConfig::default(),
}),
},
);
tenant_shard.observed.locations.insert(
NodeId(2),
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedStale,
generation: Some(1),
secondary_conf: None,
shard_number: tenant_shard.shard.number.0,
shard_count: tenant_shard.shard.count.literal(),
shard_stripe_size: tenant_shard.shard.stripe_size.0,
tenant_conf: TenantConfig::default(),
}),
},
);
tenant_shard.intent_from_observed(&mut scheduler);
// The highest generationed attached location gets used as attached
assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
// Other locations get used as secondary
assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
tenant_shard.intent.clear(&mut scheduler);
Ok(())
}
#[test]
fn scheduling_mode() -> anyhow::Result<()> {
let nodes = make_test_nodes(3, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
// In pause mode, schedule() shouldn't do anything
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
assert!(
tenant_shard
.schedule(&mut scheduler, &mut ScheduleContext::default())
.is_ok()
);
assert!(tenant_shard.intent.all_pageservers().is_empty());
// In active mode, schedule() works
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
assert!(
tenant_shard
.schedule(&mut scheduler, &mut ScheduleContext::default())
.is_ok()
);
assert!(!tenant_shard.intent.all_pageservers().is_empty());
tenant_shard.intent.clear(&mut scheduler);
Ok(())
}
#[test]
/// Simple case: moving attachment to somewhere better where we already have a secondary
fn optimize_attachment_simple() -> anyhow::Result<()> {
let nodes = make_test_nodes(
3,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Initially: both nodes attached on shard 1, and both have secondary locations
// on different nodes.
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context
}
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
// // Either shard should recognize that it has the option to switch to a secondary location where there
// // would be no other shards from the same tenant, and request to do so.
// assert_eq!(
// optimization_a_prepare,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_migrate,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
// old_attached_node_id: NodeId(1),
// new_attached_node_id: NodeId(2)
// })
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
// assert_eq!(
// optimization_a_cleanup,
// Some(ScheduleOptimization {
// sequence: shard_a.sequence,
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
// })
// );
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
#[test]
/// Complicated case: moving attachment to somewhere better where we do not have a secondary
/// already, creating one as needed.
fn optimize_attachment_multistep() -> anyhow::Result<()> {
let nodes = make_test_nodes(
3,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
// Two shards of a tenant that wants to be in AZ A
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context
}
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
let schedule_context = make_schedule_context(&shard_a, &shard_b);
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_cleanup,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
#[test]
/// How the optimisation code handles a shard with a preferred node set; this is an example
/// of the multi-step migration, but driven by a different input.
fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
let nodes = make_test_nodes(
4,
&[
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-b".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
// Two shards of a tenant that wants to be in AZ A
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
// Initially attached in a stable location
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
// Set the preferred node to node 2, an equally high scoring node to its current location
shard_a.preferred_node = Some(NodeId(2));
fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context
}
let schedule_context = make_schedule_context(&shard_a);
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
// The first step of the optimisation should not have cleared the preferred node
assert_eq!(shard_a.preferred_node, Some(NodeId(2)));
let schedule_context = make_schedule_context(&shard_a);
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
// The cutover step of the optimisation should have cleared the preferred node
assert_eq!(shard_a.preferred_node, None);
let schedule_context = make_schedule_context(&shard_a);
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_cleanup,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
shard_a.intent.clear(&mut scheduler);
Ok(())
}
#[test]
/// Check that multi-step migration works when moving to somewhere that is only better by
/// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
/// counting toward the affinity score such that it prevents the rest of the migration from happening.
fn optimize_attachment_marginal() -> anyhow::Result<()> {
let nodes = make_test_nodes(2, &[]);
let mut scheduler = Scheduler::new(nodes.values());
// Multi-sharded tenant, we will craft a situation where affinity
// scores differ only slightly
let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
// 1 attached on node 1
shards[0]
.intent
.set_attached(&mut scheduler, Some(NodeId(1)));
// 3 attached on node 2
shards[1]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
shards[2]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
shards[3]
.intent
.set_attached(&mut scheduler, Some(NodeId(2)));
// The scheduler should figure out that we need to:
// - Create a secondary for shard 3 on node 1
// - Migrate shard 3 to node 1
// - Remove shard 3's location on node 2
fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
let mut schedule_context = ScheduleContext::default();
for shard in shards {
schedule_context.avoid(&shard.intent.all_pageservers());
}
schedule_context
}
let schedule_context = make_schedule_context(&shards);
let optimization_a_prepare =
shards[1].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_prepare,
Some(ScheduleOptimization {
sequence: shards[1].sequence,
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
})
);
shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
let schedule_context = make_schedule_context(&shards);
let optimization_a_migrate =
shards[1].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_migrate,
Some(ScheduleOptimization {
sequence: shards[1].sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(2),
new_attached_node_id: NodeId(1)
})
})
);
shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
let schedule_context = make_schedule_context(&shards);
let optimization_a_cleanup =
shards[1].optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization_a_cleanup,
Some(ScheduleOptimization {
sequence: shards[1].sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
})
);
shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
// Everything should be stable now
let schedule_context = make_schedule_context(&shards);
assert_eq!(
shards[0].optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shards[1].optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shards[2].optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shards[3].optimize_attachment(&mut scheduler, &schedule_context),
None
);
for mut shard in shards {
shard.intent.clear(&mut scheduler);
}
Ok(())
}
#[test]
fn optimize_secondary() -> anyhow::Result<()> {
let nodes = make_test_nodes(4, &[]);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
// Initially: both nodes attached on shard 1, and both have secondary locations
// on different nodes.
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.avoid(&shard_b.intent.all_pageservers());
let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
// Since there is a node with no locations available, the node with two locations for the
// same tenant should generate an optimization to move one away
assert_eq!(
optimization_a,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: NodeId(3),
new_node_id: NodeId(4)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
/// Test how the optimisation code behaves with an extra secondary
#[test]
fn optimize_removes_secondary() -> anyhow::Result<()> {
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let mut nodes = make_test_nodes(
4,
&[
az_a_tag.clone(),
az_b_tag.clone(),
az_a_tag.clone(),
az_b_tag.clone(),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut schedule_context = ScheduleContext::default();
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
shard_a
.schedule(&mut scheduler, &mut schedule_context)
.unwrap();
// Attached on node 1, secondary on node 2
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
// Initially optimiser is idle
assert_eq!(
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
None
);
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
// to our new location
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
// A spare secondary in the non-home AZ, and one of them is offline
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
nodes
.get_mut(&NodeId(4))
.unwrap()
.set_availability(NodeAvailability::Offline);
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
// A spare secondary when should have none
shard_a.policy = PlacementPolicy::Attached(0);
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
assert_eq!(
optimization,
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
})
);
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
// Check that in secondary mode, we preserve the secondary in the preferred AZ
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
shard_a.policy = PlacementPolicy::Secondary;
shard_a
.schedule(&mut scheduler, &mut schedule_context)
.unwrap();
assert_eq!(shard_a.intent.get_attached(), &None);
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
assert_eq!(
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
None
);
assert_eq!(
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
None
);
shard_a.intent.clear(&mut scheduler);
Ok(())
}
// Optimize til quiescent: this emulates what Service::optimize_all does, when
// called repeatedly in the background.
// Returns the applied optimizations
fn optimize_til_idle(
scheduler: &mut Scheduler,
shards: &mut [TenantShard],
) -> Vec<ScheduleOptimization> {
let mut loop_n = 0;
let mut optimizations = Vec::default();
loop {
let mut schedule_context = ScheduleContext::default();
let mut any_changed = false;
for shard in shards.iter() {
schedule_context.avoid(&shard.intent.all_pageservers());
}
for shard in shards.iter_mut() {
let optimization = shard.optimize_attachment(scheduler, &schedule_context);
tracing::info!(
"optimize_attachment({})={:?}",
shard.tenant_shard_id,
optimization
);
if let Some(optimization) = optimization {
// Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
assert!(shard.maybe_optimizable(scheduler, &schedule_context));
optimizations.push(optimization.clone());
shard.apply_optimization(scheduler, optimization);
any_changed = true;
break;
}
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
tracing::info!(
"optimize_secondary({})={:?}",
shard.tenant_shard_id,
optimization
);
if let Some(optimization) = optimization {
// Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
assert!(shard.maybe_optimizable(scheduler, &schedule_context));
optimizations.push(optimization.clone());
shard.apply_optimization(scheduler, optimization);
any_changed = true;
break;
}
}
if !any_changed {
break;
}
// Assert no infinite loop
loop_n += 1;
assert!(loop_n < 1000);
}
optimizations
}
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
/// that it converges.
#[test]
fn optimize_add_nodes() -> anyhow::Result<()> {
let nodes = make_test_nodes(
9,
&[
// Initial 6 nodes
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
AvailabilityZone("az-c".to_string()),
// Three we will add later
AvailabilityZone("az-a".to_string()),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
// Only show the scheduler two nodes in each AZ to start with
let mut scheduler = Scheduler::new([].iter());
for i in 1..=6 {
scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
}
let mut shards = make_test_tenant(
PlacementPolicy::Attached(1),
ShardCount::new(4),
Some(AvailabilityZone("az-a".to_string())),
);
let mut schedule_context = ScheduleContext::default();
for shard in &mut shards {
assert!(
shard
.schedule(&mut scheduler, &mut schedule_context)
.is_ok()
);
}
// Initial: attached locations land in the tenant's home AZ.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
// Initial: secondary locations in a remote AZ
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
// Add another three nodes: we should see the shards spread out when their optimize
// methods are called
scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
optimize_til_idle(&mut scheduler, &mut shards);
// We expect one attached location was moved to the new node in the tenant's home AZ
assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
// The original node has one less attached shard
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
// One of the original nodes still has two attachments, since there are an odd number of nodes
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
// None of our secondaries moved, since we already had enough nodes for those to be
// scheduled perfectly
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
for shard in shards.iter_mut() {
shard.intent.clear(&mut scheduler);
}
Ok(())
}
/// Test that initial shard scheduling is optimal. By optimal we mean
/// that the optimizer cannot find a way to improve it.
///
/// This test is an example of the scheduling issue described in
/// https://github.com/neondatabase/neon/issues/8969
#[test]
fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
use itertools::Itertools;
let nodes = make_test_nodes(2, &[]);
let mut scheduler = Scheduler::new([].iter());
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
for (shard, context) in schedule_order {
let context = &mut *context.borrow_mut();
shard.schedule(&mut scheduler, context).unwrap();
}
let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
assert_eq!(applied_to_a, vec![]);
let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
assert_eq!(applied_to_b, vec![]);
for shard in a.iter_mut().chain(b.iter_mut()) {
shard.intent.clear(&mut scheduler);
}
Ok(())
}
#[test]
fn random_az_shard_scheduling() -> anyhow::Result<()> {
use rand::seq::SliceRandom;
for seed in 0..50 {
eprintln!("Running test with seed {seed}");
let mut rng = StdRng::seed_from_u64(seed);
let az_a_tag = AvailabilityZone("az-a".to_string());
let az_b_tag = AvailabilityZone("az-b".to_string());
let azs = [az_a_tag, az_b_tag];
let nodes = make_test_nodes(4, &azs);
let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
let mut scheduler = Scheduler::new([].iter());
for node in nodes.values() {
scheduler.node_upsert(node);
}
let mut shards = Vec::default();
let mut contexts = Vec::default();
let mut az_picker = azs.iter().cycle().cloned();
for i in 0..100 {
let az = az_picker.next().unwrap();
let shard_count = i % 4 + 1;
*shards_per_az.entry(az.clone()).or_default() += shard_count;
let tenant_shards = make_test_tenant(
PlacementPolicy::Attached(1),
ShardCount::new(shard_count.try_into().unwrap()),
Some(az),
);
let context = Rc::new(RefCell::new(ScheduleContext::default()));
contexts.push(context.clone());
let with_ctx = tenant_shards
.into_iter()
.map(|shard| (shard, context.clone()));
for shard_with_ctx in with_ctx {
shards.push(shard_with_ctx);
}
}
shards.shuffle(&mut rng);
#[derive(Default, Debug)]
struct NodeStats {
attachments: u32,
secondaries: u32,
}
let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
let mut attachments_in_wrong_az = 0;
let mut secondaries_in_wrong_az = 0;
for (shard, context) in &mut shards {
let context = &mut *context.borrow_mut();
shard.schedule(&mut scheduler, context).unwrap();
let attached_node = shard.intent.get_attached().unwrap();
let stats = node_stats.entry(attached_node).or_default();
stats.attachments += 1;
let secondary_node = *shard.intent.get_secondary().first().unwrap();
let stats = node_stats.entry(secondary_node).or_default();
stats.secondaries += 1;
let attached_node_az = nodes
.get(&attached_node)
.unwrap()
.get_availability_zone_id();
let secondary_node_az = nodes
.get(&secondary_node)
.unwrap()
.get_availability_zone_id();
let preferred_az = shard.preferred_az().unwrap();
if attached_node_az != preferred_az {
eprintln!(
"{} attachment was scheduled in AZ {} but preferred AZ {}",
shard.tenant_shard_id, attached_node_az, preferred_az
);
attachments_in_wrong_az += 1;
}
if secondary_node_az == preferred_az {
eprintln!(
"{} secondary was scheduled in AZ {} which matches preference",
shard.tenant_shard_id, attached_node_az
);
secondaries_in_wrong_az += 1;
}
}
let mut violations = Vec::default();
if attachments_in_wrong_az > 0 {
violations.push(format!(
"{} attachments scheduled to the incorrect AZ",
attachments_in_wrong_az
));
}
if secondaries_in_wrong_az > 0 {
violations.push(format!(
"{} secondaries scheduled to the incorrect AZ",
secondaries_in_wrong_az
));
}
eprintln!(
"attachments_in_wrong_az={} secondaries_in_wrong_az={}",
attachments_in_wrong_az, secondaries_in_wrong_az
);
for (node_id, stats) in &node_stats {
let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
let allowed_attachment_load =
(ideal_attachment_load - 1)..(ideal_attachment_load + 2);
if !allowed_attachment_load.contains(&stats.attachments) {
violations.push(format!(
"Found {} attachments on node {}, but expected {}",
stats.attachments, node_id, ideal_attachment_load
));
}
eprintln!(
"{}: attachments={} secondaries={} ideal_attachment_load={}",
node_id, stats.attachments, stats.secondaries, ideal_attachment_load
);
}
assert!(violations.is_empty(), "{violations:?}");
for (mut shard, _ctx) in shards {
shard.intent.clear(&mut scheduler);
}
}
Ok(())
}
/// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
#[test]
fn tenant_secondary_scheduling() -> anyhow::Result<()> {
let az_a = AvailabilityZone("az-a".to_string());
let nodes = make_test_nodes(
3,
&[
az_a.clone(),
AvailabilityZone("az-b".to_string()),
AvailabilityZone("az-c".to_string()),
],
);
let mut scheduler = Scheduler::new(nodes.values());
let mut context = ScheduleContext::default();
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
tenant_shard.intent.preferred_az_id = Some(az_a.clone());
tenant_shard
.schedule(&mut scheduler, &mut context)
.expect("we have enough nodes, scheduling should work");
assert_eq!(tenant_shard.intent.secondary.len(), 1);
assert!(tenant_shard.intent.attached.is_none());
// Should have scheduled into the preferred AZ
assert_eq!(
scheduler
.get_node_az(&tenant_shard.intent.secondary[0])
.as_ref(),
tenant_shard.preferred_az()
);
// Optimizer should agree
assert_eq!(
tenant_shard.optimize_attachment(&mut scheduler, &context),
None
);
assert_eq!(
tenant_shard.optimize_secondary(&mut scheduler, &context),
None
);
// Switch to PlacementPolicy::Attached
tenant_shard.policy = PlacementPolicy::Attached(1);
tenant_shard
.schedule(&mut scheduler, &mut context)
.expect("we have enough nodes, scheduling should work");
assert_eq!(tenant_shard.intent.secondary.len(), 1);
assert!(tenant_shard.intent.attached.is_some());
// Secondary should now be in non-preferred AZ
assert_ne!(
scheduler
.get_node_az(&tenant_shard.intent.secondary[0])
.as_ref(),
tenant_shard.preferred_az()
);
// Attached should be in preferred AZ
assert_eq!(
scheduler
.get_node_az(&tenant_shard.intent.attached.unwrap())
.as_ref(),
tenant_shard.preferred_az()
);
// Optimizer should agree
assert_eq!(
tenant_shard.optimize_attachment(&mut scheduler, &context),
None
);
assert_eq!(
tenant_shard.optimize_secondary(&mut scheduler, &context),
None
);
// Switch back to PlacementPolicy::Secondary
tenant_shard.policy = PlacementPolicy::Secondary;
tenant_shard
.schedule(&mut scheduler, &mut context)
.expect("we have enough nodes, scheduling should work");
assert_eq!(tenant_shard.intent.secondary.len(), 1);
assert!(tenant_shard.intent.attached.is_none());
// When we picked a location to keep, we should have kept the one in the preferred AZ
assert_eq!(
scheduler
.get_node_az(&tenant_shard.intent.secondary[0])
.as_ref(),
tenant_shard.preferred_az()
);
// Optimizer should agree
assert_eq!(
tenant_shard.optimize_attachment(&mut scheduler, &context),
None
);
assert_eq!(
tenant_shard.optimize_secondary(&mut scheduler, &context),
None
);
tenant_shard.intent.clear(&mut scheduler);
Ok(())
}
}