mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
## Problem The `stably_attached` function is hard to read due to deeply nested conditionals ## Summary of Changes - Refactored `stably_attached` to use early returns and the `?` operator for improved readability
3155 lines
127 KiB
Rust
3155 lines
127 KiB
Rust
use std::collections::{HashMap, HashSet};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use futures::future::{self, Either};
|
|
use itertools::Itertools;
|
|
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
|
|
use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
|
|
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::task::JoinHandle;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{Instrument, instrument};
|
|
use utils::generation::Generation;
|
|
use utils::id::NodeId;
|
|
use utils::seqwait::{SeqWait, SeqWaitError};
|
|
use utils::shard::ShardCount;
|
|
use utils::sync::gate::GateGuard;
|
|
|
|
use crate::compute_hook::ComputeHook;
|
|
use crate::metrics::{
|
|
self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
|
|
};
|
|
use crate::node::Node;
|
|
use crate::persistence::split_state::SplitState;
|
|
use crate::persistence::{Persistence, TenantShardPersistence};
|
|
use crate::reconciler::{
|
|
ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
|
|
attached_location_conf, secondary_location_conf,
|
|
};
|
|
use crate::scheduler::{
|
|
AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
|
|
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
|
|
};
|
|
use crate::service::ReconcileResultRequest;
|
|
use crate::timeline_import::TimelineImportState;
|
|
use crate::{Sequence, service};
|
|
|
|
/// Serialization helper
|
|
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::ser::Serializer,
|
|
T: std::fmt::Display,
|
|
{
|
|
serializer.collect_str(
|
|
&v.lock()
|
|
.unwrap()
|
|
.as_ref()
|
|
.map(|e| format!("{e}"))
|
|
.unwrap_or("".to_string()),
|
|
)
|
|
}
|
|
|
|
/// In-memory state for a particular tenant shard.
|
|
///
|
|
/// This struct implement Serialize for debugging purposes, but is _not_ persisted
|
|
/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
|
|
#[derive(Serialize)]
|
|
pub(crate) struct TenantShard {
|
|
pub(crate) tenant_shard_id: TenantShardId,
|
|
|
|
pub(crate) shard: ShardIdentity,
|
|
|
|
// Runtime only: sequence used to coordinate when updating this object while
|
|
// with background reconcilers may be running. A reconciler runs to a particular
|
|
// sequence.
|
|
pub(crate) sequence: Sequence,
|
|
|
|
// Latest generation number: next time we attach, increment this
|
|
// and use the incremented number when attaching.
|
|
//
|
|
// None represents an incompletely onboarded tenant via the [`Service::location_config`]
|
|
// API, where this tenant may only run in PlacementPolicy::Secondary.
|
|
pub(crate) generation: Option<Generation>,
|
|
|
|
// High level description of how the tenant should be set up. Provided
|
|
// externally.
|
|
pub(crate) policy: PlacementPolicy,
|
|
|
|
// Low level description of exactly which pageservers should fulfil
|
|
// which role. Generated by `Self::schedule`.
|
|
pub(crate) intent: IntentState,
|
|
|
|
// Low level description of how the tenant is configured on pageservers:
|
|
// if this does not match `Self::intent` then the tenant needs reconciliation
|
|
// with `Self::reconcile`.
|
|
pub(crate) observed: ObservedState,
|
|
|
|
// Tenant configuration, passed through opaquely to the pageserver. Identical
|
|
// for all shards in a tenant.
|
|
pub(crate) config: TenantConfig,
|
|
|
|
/// If a reconcile task is currently in flight, it may be joined here (it is
|
|
/// only safe to join if either the result has been received or the reconciler's
|
|
/// cancellation token has been fired)
|
|
#[serde(skip)]
|
|
pub(crate) reconciler: Option<ReconcilerHandle>,
|
|
|
|
/// If a tenant is being split, then all shards with that TenantId will have a
|
|
/// SplitState set, this acts as a guard against other operations such as background
|
|
/// reconciliation, and timeline creation.
|
|
pub(crate) splitting: SplitState,
|
|
|
|
/// Flag indicating whether the tenant has an in-progress timeline import.
|
|
/// Used to disallow shard splits while an import is in progress.
|
|
pub(crate) importing: TimelineImportState,
|
|
|
|
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
|
|
/// is set. This flag is cleared when the tenant is popped off the delay queue.
|
|
pub(crate) delayed_reconcile: bool,
|
|
|
|
/// Optionally wait for reconciliation to complete up to a particular
|
|
/// sequence number.
|
|
#[serde(skip)]
|
|
pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
|
|
/// Indicates sequence number for which we have encountered an error reconciling. If
|
|
/// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
|
|
/// and callers should stop waiting for `waiter` and propagate the error.
|
|
#[serde(skip)]
|
|
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
|
|
/// The most recent error from a reconcile on this tenant. This is a nested Arc
|
|
/// because:
|
|
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
|
|
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
|
|
/// many waiters for one shard, and the underlying error types are not Clone.
|
|
///
|
|
/// TODO: generalize to an array of recent events
|
|
/// TOOD: use a ArcSwap instead of mutex for faster reads?
|
|
#[serde(serialize_with = "read_last_error")]
|
|
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
|
|
|
/// If we have a pending compute notification that for some reason we weren't able to send,
|
|
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
|
|
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
|
|
/// of state that we publish externally in an eventually consistent way.
|
|
pub(crate) pending_compute_notification: bool,
|
|
|
|
/// To do a graceful migration, set this field to the destination pageserver, and optimization
|
|
/// functions will consider this node the best location and react appropriately.
|
|
preferred_node: Option<NodeId>,
|
|
|
|
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
|
|
// be set to a non-active state to avoid making changes while the issue is fixed.
|
|
scheduling_policy: ShardSchedulingPolicy,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Serialize)]
|
|
pub(crate) struct IntentState {
|
|
attached: Option<NodeId>,
|
|
secondary: Vec<NodeId>,
|
|
|
|
// We should attempt to schedule this shard in the provided AZ to
|
|
// decrease chances of cross-AZ compute.
|
|
preferred_az_id: Option<AvailabilityZone>,
|
|
}
|
|
|
|
impl IntentState {
|
|
pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
|
|
Self {
|
|
attached: None,
|
|
secondary: vec![],
|
|
preferred_az_id,
|
|
}
|
|
}
|
|
pub(crate) fn single(
|
|
scheduler: &mut Scheduler,
|
|
node_id: Option<NodeId>,
|
|
preferred_az_id: Option<AvailabilityZone>,
|
|
) -> Self {
|
|
if let Some(node_id) = node_id {
|
|
scheduler.update_node_ref_counts(
|
|
node_id,
|
|
preferred_az_id.as_ref(),
|
|
RefCountUpdate::Attach,
|
|
);
|
|
}
|
|
Self {
|
|
attached: node_id,
|
|
secondary: vec![],
|
|
preferred_az_id,
|
|
}
|
|
}
|
|
|
|
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
|
|
if self.attached != new_attached {
|
|
if let Some(old_attached) = self.attached.take() {
|
|
scheduler.update_node_ref_counts(
|
|
old_attached,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::Detach,
|
|
);
|
|
}
|
|
if let Some(new_attached) = &new_attached {
|
|
scheduler.update_node_ref_counts(
|
|
*new_attached,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::Attach,
|
|
);
|
|
}
|
|
self.attached = new_attached;
|
|
}
|
|
|
|
if let Some(new_attached) = &new_attached {
|
|
assert!(!self.secondary.contains(new_attached));
|
|
}
|
|
}
|
|
|
|
/// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
|
|
/// secondary to attached while maintaining the scheduler's reference counts.
|
|
pub(crate) fn promote_attached(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
promote_secondary: NodeId,
|
|
) {
|
|
// If we call this with a node that isn't in secondary, it would cause incorrect
|
|
// scheduler reference counting, since we assume the node is already referenced as a secondary.
|
|
debug_assert!(self.secondary.contains(&promote_secondary));
|
|
|
|
self.secondary.retain(|n| n != &promote_secondary);
|
|
|
|
let demoted = self.attached;
|
|
self.attached = Some(promote_secondary);
|
|
|
|
scheduler.update_node_ref_counts(
|
|
promote_secondary,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::PromoteSecondary,
|
|
);
|
|
if let Some(demoted) = demoted {
|
|
scheduler.update_node_ref_counts(
|
|
demoted,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::DemoteAttached,
|
|
);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
|
|
assert!(!self.secondary.contains(&new_secondary));
|
|
assert!(self.attached != Some(new_secondary));
|
|
scheduler.update_node_ref_counts(
|
|
new_secondary,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::AddSecondary,
|
|
);
|
|
self.secondary.push(new_secondary);
|
|
}
|
|
|
|
/// It is legal to call this with a node that is not currently a secondary: that is a no-op
|
|
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
|
|
let index = self.secondary.iter().position(|n| *n == node_id);
|
|
if let Some(index) = index {
|
|
scheduler.update_node_ref_counts(
|
|
node_id,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::RemoveSecondary,
|
|
);
|
|
self.secondary.remove(index);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
|
|
for secondary in self.secondary.drain(..) {
|
|
scheduler.update_node_ref_counts(
|
|
secondary,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::RemoveSecondary,
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Remove the last secondary node from the list of secondaries
|
|
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
|
|
if let Some(node_id) = self.secondary.pop() {
|
|
scheduler.update_node_ref_counts(
|
|
node_id,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::RemoveSecondary,
|
|
);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
|
if let Some(old_attached) = self.attached.take() {
|
|
scheduler.update_node_ref_counts(
|
|
old_attached,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::Detach,
|
|
);
|
|
}
|
|
|
|
self.clear_secondary(scheduler);
|
|
}
|
|
|
|
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
|
let mut result = Vec::new();
|
|
if let Some(p) = self.attached {
|
|
result.push(p)
|
|
}
|
|
|
|
result.extend(self.secondary.iter().copied());
|
|
|
|
result
|
|
}
|
|
|
|
pub(crate) fn get_attached(&self) -> &Option<NodeId> {
|
|
&self.attached
|
|
}
|
|
|
|
pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
|
|
&self.secondary
|
|
}
|
|
|
|
/// If the node is in use as the attached location, demote it into
|
|
/// the list of secondary locations. This is used when a node goes offline,
|
|
/// and we want to use a different node for attachment, but not permanently
|
|
/// forget the location on the offline node.
|
|
///
|
|
/// Returns true if a change was made
|
|
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
|
|
if self.attached == Some(node_id) {
|
|
self.attached = None;
|
|
self.secondary.push(node_id);
|
|
scheduler.update_node_ref_counts(
|
|
node_id,
|
|
self.preferred_az_id.as_ref(),
|
|
RefCountUpdate::DemoteAttached,
|
|
);
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
pub(crate) fn set_preferred_az(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
preferred_az: Option<AvailabilityZone>,
|
|
) {
|
|
let new_az = preferred_az.as_ref();
|
|
let old_az = self.preferred_az_id.as_ref();
|
|
|
|
if old_az != new_az {
|
|
if let Some(node_id) = self.attached {
|
|
scheduler.update_node_ref_counts(
|
|
node_id,
|
|
new_az,
|
|
RefCountUpdate::ChangePreferredAzFrom(old_az),
|
|
);
|
|
}
|
|
for node_id in &self.secondary {
|
|
scheduler.update_node_ref_counts(
|
|
*node_id,
|
|
new_az,
|
|
RefCountUpdate::ChangePreferredAzFrom(old_az),
|
|
);
|
|
}
|
|
self.preferred_az_id = preferred_az;
|
|
}
|
|
}
|
|
|
|
pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
|
|
self.preferred_az_id.as_ref()
|
|
}
|
|
}
|
|
|
|
impl Drop for IntentState {
|
|
fn drop(&mut self) {
|
|
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
|
|
// We do not check this while panicking, to avoid polluting unit test failures or
|
|
// other assertions with this assertion's output. It's still wrong to leak these,
|
|
// but if we already have a panic then we don't need to independently flag this case.
|
|
if !(std::thread::panicking()) {
|
|
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
|
|
pub(crate) struct ObservedState {
|
|
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
|
|
}
|
|
|
|
/// Our latest knowledge of how this tenant is configured in the outside world.
|
|
///
|
|
/// Meaning:
|
|
/// * No instance of this type exists for a node: we are certain that we have nothing configured on that
|
|
/// node for this shard.
|
|
/// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
|
|
/// what it is (e.g. we failed partway through configuring it)
|
|
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
|
|
/// and that configuration will still be present unless something external interfered.
|
|
#[derive(Clone, Serialize, Deserialize, Debug)]
|
|
pub(crate) struct ObservedStateLocation {
|
|
/// If None, it means we do not know the status of this shard's location on this node, but
|
|
/// we know that we might have some state on this node.
|
|
pub(crate) conf: Option<LocationConfig>,
|
|
}
|
|
|
|
pub(crate) struct ReconcilerWaiter {
|
|
// For observability purposes, remember the ID of the shard we're
|
|
// waiting for.
|
|
pub(crate) tenant_shard_id: TenantShardId,
|
|
|
|
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
|
seq: Sequence,
|
|
}
|
|
|
|
pub(crate) enum ReconcilerStatus {
|
|
Done,
|
|
Failed,
|
|
InProgress,
|
|
}
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub(crate) enum ReconcileWaitError {
|
|
#[error("Timeout waiting for shard {0}")]
|
|
Timeout(TenantShardId),
|
|
#[error("shutting down")]
|
|
Shutdown,
|
|
#[error("Reconcile error on shard {0}: {1}")]
|
|
Failed(TenantShardId, Arc<ReconcileError>),
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug, Clone)]
|
|
pub(crate) struct ReplaceSecondary {
|
|
old_node_id: NodeId,
|
|
new_node_id: NodeId,
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug, Clone)]
|
|
pub(crate) struct MigrateAttachment {
|
|
pub(crate) old_attached_node_id: NodeId,
|
|
pub(crate) new_attached_node_id: NodeId,
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug, Clone)]
|
|
pub(crate) enum ScheduleOptimizationAction {
|
|
// Replace one of our secondary locations with a different node
|
|
ReplaceSecondary(ReplaceSecondary),
|
|
// Migrate attachment to an existing secondary location
|
|
MigrateAttachment(MigrateAttachment),
|
|
// Create a secondary location, with the intent of later migrating to it
|
|
CreateSecondary(NodeId),
|
|
// Remove a secondary location that we previously created to facilitate a migration
|
|
RemoveSecondary(NodeId),
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug, Clone)]
|
|
pub(crate) struct ScheduleOptimization {
|
|
// What was the reconcile sequence when we generated this optimization? The optimization
|
|
// should only be applied if the shard's sequence is still at this value, in case other changes
|
|
// happened between planning the optimization and applying it.
|
|
sequence: Sequence,
|
|
|
|
pub(crate) action: ScheduleOptimizationAction,
|
|
}
|
|
|
|
impl ReconcilerWaiter {
|
|
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
|
|
tokio::select! {
|
|
result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
|
|
result.map_err(|e| match e {
|
|
SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
|
|
SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
|
|
})?;
|
|
},
|
|
result = self.error_seq_wait.wait_for(self.seq) => {
|
|
result.map_err(|e| match e {
|
|
SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
|
|
SeqWaitError::Timeout => unreachable!()
|
|
})?;
|
|
|
|
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
|
|
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) fn get_status(&self) -> ReconcilerStatus {
|
|
if self.seq_wait.would_wait_for(self.seq).is_ok() {
|
|
ReconcilerStatus::Done
|
|
} else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
|
|
ReconcilerStatus::Failed
|
|
} else {
|
|
ReconcilerStatus::InProgress
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Having spawned a reconciler task, the tenant shard's state will carry enough
|
|
/// information to optionally cancel & await it later.
|
|
pub(crate) struct ReconcilerHandle {
|
|
sequence: Sequence,
|
|
handle: JoinHandle<()>,
|
|
cancel: CancellationToken,
|
|
}
|
|
|
|
pub(crate) enum ReconcileNeeded {
|
|
/// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
|
|
/// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
|
|
No,
|
|
/// shard has a reconciler running, and its intent hasn't changed since that one was
|
|
/// spawned: wait for the existing reconciler rather than spawning a new one.
|
|
WaitExisting(ReconcilerWaiter),
|
|
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
|
|
Yes(ReconcileReason),
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) enum ReconcileReason {
|
|
ActiveNodesDirty,
|
|
UnknownLocation,
|
|
PendingComputeNotification,
|
|
}
|
|
|
|
/// Pending modification to the observed state of a tenant shard.
|
|
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
|
|
pub(crate) enum ObservedStateDelta {
|
|
Upsert(Box<(NodeId, ObservedStateLocation)>),
|
|
Delete(NodeId),
|
|
}
|
|
|
|
impl ObservedStateDelta {
|
|
pub(crate) fn node_id(&self) -> &NodeId {
|
|
match self {
|
|
Self::Upsert(up) => &up.0,
|
|
Self::Delete(nid) => nid,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// When a reconcile task completes, it sends this result object
|
|
/// to be applied to the primary TenantShard.
|
|
pub(crate) struct ReconcileResult {
|
|
pub(crate) sequence: Sequence,
|
|
/// On errors, `observed` should be treated as an incompleted description
|
|
/// of state (i.e. any nodes present in the result should override nodes
|
|
/// present in the parent tenant state, but any unmentioned nodes should
|
|
/// not be removed from parent tenant state)
|
|
pub(crate) result: Result<(), ReconcileError>,
|
|
|
|
pub(crate) tenant_shard_id: TenantShardId,
|
|
pub(crate) generation: Option<Generation>,
|
|
pub(crate) observed_deltas: Vec<ObservedStateDelta>,
|
|
|
|
/// Set [`TenantShard::pending_compute_notification`] from this flag
|
|
pub(crate) pending_compute_notification: bool,
|
|
}
|
|
|
|
impl ObservedState {
|
|
pub(crate) fn new() -> Self {
|
|
Self {
|
|
locations: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn is_empty(&self) -> bool {
|
|
self.locations.is_empty()
|
|
}
|
|
}
|
|
|
|
impl TenantShard {
|
|
pub(crate) fn new(
|
|
tenant_shard_id: TenantShardId,
|
|
shard: ShardIdentity,
|
|
policy: PlacementPolicy,
|
|
preferred_az_id: Option<AvailabilityZone>,
|
|
) -> Self {
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_tenant_shards
|
|
.inc();
|
|
|
|
Self {
|
|
tenant_shard_id,
|
|
policy,
|
|
intent: IntentState::new(preferred_az_id),
|
|
generation: Some(Generation::new(0)),
|
|
shard,
|
|
observed: ObservedState::default(),
|
|
config: TenantConfig::default(),
|
|
reconciler: None,
|
|
splitting: SplitState::Idle,
|
|
importing: TimelineImportState::Idle,
|
|
sequence: Sequence(1),
|
|
delayed_reconcile: false,
|
|
waiter: Arc::new(SeqWait::new(Sequence(0))),
|
|
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
|
|
last_error: Arc::default(),
|
|
pending_compute_notification: false,
|
|
scheduling_policy: ShardSchedulingPolicy::default(),
|
|
preferred_node: None,
|
|
}
|
|
}
|
|
|
|
/// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
|
|
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
|
|
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
|
|
/// in a way that makes use of any configured locations that already exist in the outside world.
|
|
pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
|
|
// Choose an attached location by filtering observed locations, and then sorting to get the highest
|
|
// generation
|
|
let mut attached_locs = self
|
|
.observed
|
|
.locations
|
|
.iter()
|
|
.filter_map(|(node_id, l)| {
|
|
if let Some(conf) = &l.conf {
|
|
if conf.mode == LocationConfigMode::AttachedMulti
|
|
|| conf.mode == LocationConfigMode::AttachedSingle
|
|
|| conf.mode == LocationConfigMode::AttachedStale
|
|
{
|
|
Some((node_id, conf.generation))
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
attached_locs.sort_by_key(|i| i.1);
|
|
if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
|
|
self.intent.set_attached(scheduler, Some(*node_id));
|
|
}
|
|
|
|
// All remaining observed locations generate secondary intents. This includes None
|
|
// observations, as these may well have some local content on disk that is usable (this
|
|
// is an edge case that might occur if we restarted during a migration or other change)
|
|
//
|
|
// We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
|
|
// will take care of promoting one of these secondaries to be attached.
|
|
self.observed.locations.keys().for_each(|node_id| {
|
|
if Some(*node_id) != self.intent.attached {
|
|
self.intent.push_secondary(scheduler, *node_id);
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
|
|
/// attached pageserver for a shard.
|
|
///
|
|
/// Returns whether we modified it, and the NodeId selected.
|
|
fn schedule_attached(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
context: &ScheduleContext,
|
|
) -> Result<(bool, NodeId), ScheduleError> {
|
|
// No work to do if we already have an attached tenant
|
|
if let Some(node_id) = self.intent.attached {
|
|
return Ok((false, node_id));
|
|
}
|
|
|
|
if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
|
|
// Promote a secondary
|
|
tracing::debug!("Promoted secondary {} to attached", promote_secondary);
|
|
self.intent.promote_attached(scheduler, promote_secondary);
|
|
Ok((true, promote_secondary))
|
|
} else {
|
|
// Pick a fresh node: either we had no secondaries or none were schedulable
|
|
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
|
|
&self.intent.secondary,
|
|
&self.intent.preferred_az_id,
|
|
context,
|
|
)?;
|
|
tracing::debug!("Selected {} as attached", node_id);
|
|
self.intent.set_attached(scheduler, Some(node_id));
|
|
Ok((true, node_id))
|
|
}
|
|
}
|
|
|
|
#[instrument(skip_all, fields(
|
|
tenant_id=%self.tenant_shard_id.tenant_id,
|
|
shard_id=%self.tenant_shard_id.shard_slug(),
|
|
sequence=%self.sequence
|
|
))]
|
|
pub(crate) fn schedule(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
context: &mut ScheduleContext,
|
|
) -> Result<(), ScheduleError> {
|
|
let r = self.do_schedule(scheduler, context);
|
|
|
|
context.avoid(&self.intent.all_pageservers());
|
|
|
|
r
|
|
}
|
|
|
|
pub(crate) fn do_schedule(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
context: &ScheduleContext,
|
|
) -> Result<(), ScheduleError> {
|
|
// TODO: before scheduling new nodes, check if any existing content in
|
|
// self.intent refers to pageservers that are offline, and pick other
|
|
// pageservers if so.
|
|
|
|
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
|
|
// change their attach location.
|
|
|
|
match self.scheduling_policy {
|
|
ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
|
|
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
|
|
// Warn to make it obvious why other things aren't happening/working, if we skip scheduling
|
|
tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
|
"Scheduling is disabled by policy {:?}", self.scheduling_policy);
|
|
return Ok(());
|
|
}
|
|
}
|
|
|
|
// Build the set of pageservers already in use by this tenant, to avoid scheduling
|
|
// more work on the same pageservers we're already using.
|
|
let mut modified = false;
|
|
|
|
// Add/remove nodes to fulfil policy
|
|
use PlacementPolicy::*;
|
|
match self.policy {
|
|
Attached(secondary_count) => {
|
|
// Should have exactly one attached, and at least N secondaries
|
|
let (modified_attached, attached_node_id) =
|
|
self.schedule_attached(scheduler, context)?;
|
|
modified |= modified_attached;
|
|
|
|
let mut used_pageservers = vec![attached_node_id];
|
|
while self.intent.secondary.len() < secondary_count {
|
|
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
|
|
&used_pageservers,
|
|
&self.intent.preferred_az_id,
|
|
context,
|
|
)?;
|
|
self.intent.push_secondary(scheduler, node_id);
|
|
used_pageservers.push(node_id);
|
|
modified = true;
|
|
}
|
|
}
|
|
Secondary => {
|
|
if let Some(node_id) = self.intent.get_attached() {
|
|
// Populate secondary by demoting the attached node
|
|
self.intent.demote_attached(scheduler, *node_id);
|
|
|
|
modified = true;
|
|
} else if self.intent.secondary.is_empty() {
|
|
// Populate secondary by scheduling a fresh node
|
|
//
|
|
// We use [`AttachedShardTag`] because when a secondary location is the only one
|
|
// a shard has, we expect that its next use will be as an attached location: we want
|
|
// the tenant to be ready to warm up and run fast in their preferred AZ.
|
|
let node_id = scheduler.schedule_shard::<AttachedShardTag>(
|
|
&[],
|
|
&self.intent.preferred_az_id,
|
|
context,
|
|
)?;
|
|
self.intent.push_secondary(scheduler, node_id);
|
|
modified = true;
|
|
}
|
|
while self.intent.secondary.len() > 1 {
|
|
// If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
|
|
// having just demoted our attached location), then we should prefer to keep the location
|
|
// in our preferred AZ. Tenants in Secondary mode want to be in the preferred AZ so that
|
|
// they have a warm location to become attached when transitioning back into Attached.
|
|
|
|
let mut candidates = self.intent.get_secondary().clone();
|
|
// Sort to get secondaries outside preferred AZ last
|
|
candidates
|
|
.sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
|
|
let secondary_to_remove = candidates.pop().unwrap();
|
|
self.intent.remove_secondary(scheduler, secondary_to_remove);
|
|
modified = true;
|
|
}
|
|
}
|
|
Detached => {
|
|
// Never add locations in this mode
|
|
if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
|
|
self.intent.clear(scheduler);
|
|
modified = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if modified {
|
|
self.sequence.0 += 1;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
|
|
/// if the swap is not possible and leaves the intent state in its original state.
|
|
///
|
|
/// Arguments:
|
|
/// `attached_to`: the currently attached location matching the intent state (may be None if the
|
|
/// shard is not attached)
|
|
/// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
|
|
/// the scheduler to recommend a node
|
|
pub(crate) fn reschedule_to_secondary(
|
|
&mut self,
|
|
promote_to: Option<NodeId>,
|
|
scheduler: &mut Scheduler,
|
|
) -> Result<(), ScheduleError> {
|
|
let promote_to = match promote_to {
|
|
Some(node) => node,
|
|
None => match self.preferred_secondary(scheduler) {
|
|
Some(node) => node,
|
|
None => {
|
|
return Err(ScheduleError::ImpossibleConstraint);
|
|
}
|
|
},
|
|
};
|
|
|
|
assert!(self.intent.get_secondary().contains(&promote_to));
|
|
|
|
if let Some(node) = self.intent.get_attached() {
|
|
let demoted = self.intent.demote_attached(scheduler, *node);
|
|
if !demoted {
|
|
return Err(ScheduleError::ImpossibleConstraint);
|
|
}
|
|
}
|
|
|
|
self.intent.promote_attached(scheduler, promote_to);
|
|
|
|
// Increment the sequence number for the edge case where a
|
|
// reconciler is already running to avoid waiting on the
|
|
// current reconcile instead of spawning a new one.
|
|
self.sequence = self.sequence.next();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
|
|
fn is_better_location<T: ShardTag>(
|
|
&self,
|
|
scheduler: &mut Scheduler,
|
|
schedule_context: &ScheduleContext,
|
|
current: NodeId,
|
|
candidate: NodeId,
|
|
) -> Option<bool> {
|
|
let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
|
|
candidate,
|
|
&self.intent.preferred_az_id,
|
|
schedule_context,
|
|
) else {
|
|
// The candidate node is unavailable for scheduling or otherwise couldn't get a score
|
|
return None;
|
|
};
|
|
|
|
// If the candidate is our preferred node, then it is better than the current location, as long
|
|
// as it is online -- the online check is part of the score calculation we did above, so it's
|
|
// important that this check comes after that one.
|
|
if let Some(preferred) = self.preferred_node.as_ref() {
|
|
if preferred == &candidate {
|
|
return Some(true);
|
|
}
|
|
}
|
|
|
|
match scheduler.compute_node_score::<T::Score>(
|
|
current,
|
|
&self.intent.preferred_az_id,
|
|
schedule_context,
|
|
) {
|
|
Some(current_score) => {
|
|
// Ignore utilization components when comparing scores: we don't want to migrate
|
|
// because of transient load variations, it risks making the system thrash, and
|
|
// migrating for utilization requires a separate high level view of the system to
|
|
// e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
|
|
// moving things around in the order that we hit this function.
|
|
let candidate_score = candidate_score.for_optimization();
|
|
let current_score = current_score.for_optimization();
|
|
|
|
if candidate_score < current_score {
|
|
tracing::info!(
|
|
"Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
|
|
);
|
|
Some(true)
|
|
} else {
|
|
// The candidate node is no better than our current location, so don't migrate
|
|
tracing::debug!(
|
|
"Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
|
|
);
|
|
Some(false)
|
|
}
|
|
}
|
|
None => {
|
|
// The current node is unavailable for scheduling, so we can't make any sensible
|
|
// decisions about optimisation. This should be a transient state -- if the node
|
|
// is offline then it will get evacuated, if is blocked by a scheduling mode
|
|
// then we will respect that mode by doing nothing.
|
|
tracing::debug!("Current node {current} is unavailable for scheduling");
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) fn find_better_location<T: ShardTag>(
|
|
&self,
|
|
scheduler: &mut Scheduler,
|
|
schedule_context: &ScheduleContext,
|
|
current: NodeId,
|
|
hard_exclude: &[NodeId],
|
|
) -> Option<NodeId> {
|
|
// If we have a migration hint, then that is our better location
|
|
if let Some(hint) = self.preferred_node.as_ref() {
|
|
if hint == ¤t {
|
|
return None;
|
|
}
|
|
|
|
return Some(*hint);
|
|
}
|
|
|
|
// Look for a lower-scoring location to attach to
|
|
let Ok(candidate_node) = scheduler.schedule_shard::<T>(
|
|
hard_exclude,
|
|
&self.intent.preferred_az_id,
|
|
schedule_context,
|
|
) else {
|
|
// A scheduling error means we have no possible candidate replacements
|
|
tracing::debug!("No candidate node found");
|
|
return None;
|
|
};
|
|
|
|
if candidate_node == current {
|
|
// We're already at the best possible location, so don't migrate
|
|
tracing::debug!("Candidate node {candidate_node} is already in use");
|
|
return None;
|
|
}
|
|
|
|
self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
|
|
.and_then(|better| if better { Some(candidate_node) } else { None })
|
|
}
|
|
|
|
/// This function is an optimization, used to avoid doing large numbers of scheduling operations
|
|
/// when looking for optimizations. This function uses knowledge of how scores work to do some
|
|
/// fast checks for whether it may to be possible to improve a score.
|
|
///
|
|
/// If we return true, it only means that optimization _might_ be possible, not that it necessarily is. If we
|
|
/// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
|
|
/// work.
|
|
pub(crate) fn maybe_optimizable(
|
|
&self,
|
|
scheduler: &mut Scheduler,
|
|
schedule_context: &ScheduleContext,
|
|
) -> bool {
|
|
// Tenant with preferred node: check if it is not already at the preferred node
|
|
if let Some(preferred) = self.preferred_node.as_ref() {
|
|
if Some(preferred) != self.intent.get_attached().as_ref() {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
// Sharded tenant: check if any locations have a nonzero affinity score
|
|
if self.shard.count >= ShardCount(1) {
|
|
let schedule_context = schedule_context.project_detach(self);
|
|
for node in self.intent.all_pageservers() {
|
|
if let Some(af) = schedule_context.nodes.get(&node) {
|
|
if *af > AffinityScore(0) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Attached tenant: check if the attachment is outside the preferred AZ
|
|
if let PlacementPolicy::Attached(_) = self.policy {
|
|
if let Some(attached) = self.intent.get_attached() {
|
|
if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tenant with secondary locations: check if any are within the preferred AZ
|
|
for secondary in self.intent.get_secondary() {
|
|
if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
// Does the tenant have excess secondaries?
|
|
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
|
return true;
|
|
}
|
|
|
|
// Fall through: no optimizations possible
|
|
false
|
|
}
|
|
|
|
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
|
/// its primary location based on soft constraints, switch that secondary location
|
|
/// to be attached.
|
|
///
|
|
/// `schedule_context` should have been populated with all shards in the tenant, including
|
|
/// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn optimize_attachment(
|
|
&self,
|
|
scheduler: &mut Scheduler,
|
|
schedule_context: &ScheduleContext,
|
|
) -> Option<ScheduleOptimization> {
|
|
let attached = (*self.intent.get_attached())?;
|
|
|
|
let schedule_context = schedule_context.project_detach(self);
|
|
|
|
// If we already have a secondary that is higher-scoring than out current location,
|
|
// then simply migrate to it.
|
|
for secondary in self.intent.get_secondary() {
|
|
if let Some(true) = self.is_better_location::<AttachedShardTag>(
|
|
scheduler,
|
|
&schedule_context,
|
|
attached,
|
|
*secondary,
|
|
) {
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: attached,
|
|
new_attached_node_id: *secondary,
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
|
|
// Given that none of our current secondaries is a better location than our current
|
|
// attached location (checked above), we may trim any secondaries that are not needed
|
|
// for the placement policy.
|
|
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
|
// This code path cleans up extra secondaries after migrating, and/or
|
|
// trims extra secondaries after a PlacementPolicy::Attached(N) was
|
|
// modified to decrease N.
|
|
|
|
let secondary_scores = self
|
|
.intent
|
|
.get_secondary()
|
|
.iter()
|
|
.map(|node_id| {
|
|
(
|
|
*node_id,
|
|
scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
|
|
*node_id,
|
|
&self.intent.preferred_az_id,
|
|
&schedule_context,
|
|
),
|
|
)
|
|
})
|
|
.collect::<HashMap<_, _>>();
|
|
|
|
if secondary_scores.iter().any(|score| score.1.is_none()) {
|
|
// Trivial case: if we only have one secondary, drop that one
|
|
if self.intent.get_secondary().len() == 1 {
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(
|
|
*self.intent.get_secondary().first().unwrap(),
|
|
),
|
|
});
|
|
}
|
|
|
|
// Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
|
|
// where its score can't be calculated), and drop the others. This enables us to make progress in
|
|
// most cases, even if some nodes are offline or have scheduling=pause set.
|
|
|
|
debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
|
|
// logic presumes we are in a mode where we want secondaries to be in non-home AZ
|
|
if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
|
|
let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
|
|
let is_available = secondary_scores
|
|
.get(n)
|
|
.expect("Built from same list of nodes")
|
|
.is_some();
|
|
is_available && !in_home_az
|
|
}) {
|
|
// Great, we found one to retain. Pick some other to drop.
|
|
if let Some(victim) = self
|
|
.intent
|
|
.get_secondary()
|
|
.iter()
|
|
.find(|n| n != &retain_secondary)
|
|
{
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
|
});
|
|
}
|
|
}
|
|
|
|
// Fall through: we didn't identify one to remove. This ought to be rare.
|
|
tracing::warn!(
|
|
"Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
|
|
self.intent.get_secondary()
|
|
);
|
|
} else {
|
|
let victim = secondary_scores
|
|
.iter()
|
|
.max_by_key(|score| score.1.unwrap())
|
|
.unwrap()
|
|
.0;
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(*victim),
|
|
});
|
|
}
|
|
}
|
|
|
|
let replacement = self.find_better_location::<AttachedShardTag>(
|
|
scheduler,
|
|
&schedule_context,
|
|
attached,
|
|
&[], // Don't exclude secondaries: our preferred attachment location may be a secondary
|
|
);
|
|
|
|
// We have found a candidate and confirmed that its score is preferable
|
|
// to our current location. See if we have a secondary location in the preferred location already: if not,
|
|
// then create one.
|
|
if let Some(replacement) = replacement {
|
|
// If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
|
|
// not in our preferred AZ. Migration has a cost in resources an impact to the workload, so we want to avoid doing
|
|
// multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
|
|
// AZ: skip this optimization if it is not in our final, preferred AZ.
|
|
//
|
|
// This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
|
|
// there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
|
|
if self.preferred_node.is_none()
|
|
&& self.intent.preferred_az_id.is_some()
|
|
&& scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
|
|
{
|
|
tracing::debug!(
|
|
"Candidate node {replacement} is not in preferred AZ {:?}",
|
|
self.intent.preferred_az_id
|
|
);
|
|
|
|
// This should only happen if our current location is not in the preferred AZ, otherwise
|
|
// [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
|
|
// AZ is the highest priority part of NodeAttachmentSchedulingScore.
|
|
debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);
|
|
|
|
return None;
|
|
}
|
|
|
|
if !self.intent.get_secondary().contains(&replacement) {
|
|
Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::CreateSecondary(replacement),
|
|
})
|
|
} else {
|
|
// We already have a secondary in the preferred location, let's try migrating to it. Our caller
|
|
// will check the warmth of the destination before deciding whether to really execute this.
|
|
Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: attached,
|
|
new_attached_node_id: replacement,
|
|
}),
|
|
})
|
|
}
|
|
} else {
|
|
// We didn't find somewhere we'd rather be, and we don't have any excess secondaries
|
|
// to clean up: no action required.
|
|
None
|
|
}
|
|
}
|
|
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn optimize_secondary(
|
|
&self,
|
|
scheduler: &mut Scheduler,
|
|
schedule_context: &ScheduleContext,
|
|
) -> Option<ScheduleOptimization> {
|
|
if self.intent.get_secondary().len() > self.policy.want_secondaries() {
|
|
// We have extra secondaries, perhaps to facilitate a migration of the attached location:
|
|
// do nothing, it is up to [`Self::optimize_attachment`] to clean them up. When that's done,
|
|
// and we are called again, we will proceed.
|
|
tracing::debug!("Too many secondaries: skipping");
|
|
return None;
|
|
}
|
|
|
|
let schedule_context = schedule_context.project_detach(self);
|
|
|
|
for secondary in self.intent.get_secondary() {
|
|
// Make sure we don't try to migrate a secondary to our attached location: this case happens
|
|
// easily in environments without multiple AZs.
|
|
let mut exclude = match self.intent.attached {
|
|
Some(attached) => vec![attached],
|
|
None => vec![],
|
|
};
|
|
|
|
// Exclude all other secondaries from the scheduling process to avoid replacing
|
|
// one existing secondary with another existing secondary.
|
|
for another_secondary in self.intent.secondary.iter() {
|
|
if another_secondary != secondary {
|
|
exclude.push(*another_secondary);
|
|
}
|
|
}
|
|
|
|
let replacement = match &self.policy {
|
|
PlacementPolicy::Attached(_) => {
|
|
// Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
|
|
// to avoid placing them in the preferred AZ.
|
|
self.find_better_location::<SecondaryShardTag>(
|
|
scheduler,
|
|
&schedule_context,
|
|
*secondary,
|
|
&exclude,
|
|
)
|
|
}
|
|
PlacementPolicy::Secondary => {
|
|
// In secondary-only mode, we want our secondary locations in the preferred AZ,
|
|
// so that they're ready to take over as an attached location when we transition
|
|
// into PlacementPolicy::Attached.
|
|
self.find_better_location::<AttachedShardTag>(
|
|
scheduler,
|
|
&schedule_context,
|
|
*secondary,
|
|
&exclude,
|
|
)
|
|
}
|
|
PlacementPolicy::Detached => None,
|
|
};
|
|
|
|
assert!(replacement != Some(*secondary));
|
|
if let Some(replacement) = replacement {
|
|
// We have found a candidate and confirmed that its score is preferable
|
|
// to our current location. See if we have a secondary location in the preferred location already: if not,
|
|
// then create one.
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
|
old_node_id: *secondary,
|
|
new_node_id: replacement,
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
|
|
/// other optimisation functions, to bias them to move to the destination node.
|
|
pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
|
|
if let Some(hint) = self.preferred_node.as_ref() {
|
|
if Some(hint) != node.as_ref() {
|
|
// This is legal but a bit surprising: we expect that administrators wouldn't usually
|
|
// change their mind about where to migrate something.
|
|
tracing::warn!(
|
|
"Changing migration destination from {hint} to {node:?} (current intent {:?})",
|
|
self.intent
|
|
);
|
|
}
|
|
}
|
|
|
|
self.preferred_node = node;
|
|
}
|
|
|
|
pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
|
|
self.preferred_node
|
|
}
|
|
|
|
/// Return true if the optimization was really applied: it will not be applied if the optimization's
|
|
/// sequence is behind this tenant shard's
|
|
pub(crate) fn apply_optimization(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
optimization: ScheduleOptimization,
|
|
) -> bool {
|
|
if optimization.sequence != self.sequence {
|
|
return false;
|
|
}
|
|
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_schedule_optimization
|
|
.inc();
|
|
|
|
match optimization.action {
|
|
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id,
|
|
new_attached_node_id,
|
|
}) => {
|
|
self.intent.demote_attached(scheduler, old_attached_node_id);
|
|
self.intent
|
|
.promote_attached(scheduler, new_attached_node_id);
|
|
|
|
if let Some(hint) = self.preferred_node.as_ref() {
|
|
if hint == &new_attached_node_id {
|
|
// The migration target is not a long term pin: once we are done with the migration, clear it.
|
|
tracing::info!("Graceful migration to {hint} complete");
|
|
self.preferred_node = None;
|
|
}
|
|
}
|
|
}
|
|
ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
|
old_node_id,
|
|
new_node_id,
|
|
}) => {
|
|
self.intent.remove_secondary(scheduler, old_node_id);
|
|
self.intent.push_secondary(scheduler, new_node_id);
|
|
}
|
|
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
|
|
self.intent.push_secondary(scheduler, new_node_id);
|
|
}
|
|
ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
|
|
self.intent.remove_secondary(scheduler, old_secondary);
|
|
}
|
|
}
|
|
|
|
true
|
|
}
|
|
|
|
/// When a shard has several secondary locations, we need to pick one in situations where
|
|
/// we promote one of them to an attached location:
|
|
/// - When draining a node for restart
|
|
/// - When responding to a node failure
|
|
///
|
|
/// In this context, 'preferred' does not mean the node with the best scheduling score: instead
|
|
/// we want to pick the node which is best for use _temporarily_ while the previous attached location
|
|
/// is unavailable (e.g. because it's down or deploying). That means we prefer to use secondary
|
|
/// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
|
|
/// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
|
|
/// they're probably not warmed up yet). The latter behavior is based oni
|
|
///
|
|
/// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
|
|
/// caller needs to a pick a node some other way.
|
|
pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
|
|
let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);
|
|
|
|
// We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
|
|
// to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
|
|
// rather than a short-lived secondary location being used for optimization/migration (which would have
|
|
// been scheduled in our preferred AZ).
|
|
let mut candidates = candidates
|
|
.iter()
|
|
.map(|(node_id, node_az)| {
|
|
if node_az == &self.intent.preferred_az_id {
|
|
(1, *node_id)
|
|
} else {
|
|
(0, *node_id)
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
candidates.sort();
|
|
|
|
candidates.first().map(|i| i.1)
|
|
}
|
|
|
|
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
|
|
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
|
|
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
|
|
///
|
|
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
|
|
/// funciton should not be used to decide whether to reconcile.
|
|
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
|
|
// We have an intent to attach for this node
|
|
let attach_intent = self.intent.attached?;
|
|
// We have an observed state for this node
|
|
let location = self.observed.locations.get(&attach_intent)?;
|
|
// Our observed state is not None, i.e. not in flux
|
|
let location_config = location.conf.as_ref()?;
|
|
|
|
// Check if our intent and observed state agree that this node is in an attached state.
|
|
match location_config.mode {
|
|
LocationConfigMode::AttachedMulti
|
|
| LocationConfigMode::AttachedSingle
|
|
| LocationConfigMode::AttachedStale => Some(attach_intent),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
|
|
let mut dirty_nodes = HashSet::new();
|
|
|
|
if let Some(node_id) = self.intent.attached {
|
|
// Maybe panic: it is a severe bug if we try to attach while generation is null.
|
|
let generation = self
|
|
.generation
|
|
.expect("Attempted to enter attached state without a generation");
|
|
|
|
let wanted_conf =
|
|
attached_location_conf(generation, &self.shard, &self.config, &self.policy);
|
|
match self.observed.locations.get(&node_id) {
|
|
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
|
Some(_) | None => {
|
|
dirty_nodes.insert(node_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
for node_id in &self.intent.secondary {
|
|
let wanted_conf = secondary_location_conf(&self.shard, &self.config);
|
|
match self.observed.locations.get(node_id) {
|
|
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
|
Some(_) | None => {
|
|
dirty_nodes.insert(*node_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
for node_id in self.observed.locations.keys() {
|
|
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
|
|
// We have observed state that isn't part of our intent: need to clean it up.
|
|
dirty_nodes.insert(*node_id);
|
|
}
|
|
}
|
|
|
|
dirty_nodes.retain(|node_id| {
|
|
nodes
|
|
.get(node_id)
|
|
.map(|n| n.is_available())
|
|
.unwrap_or(false)
|
|
});
|
|
|
|
!dirty_nodes.is_empty()
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn get_reconcile_needed(
|
|
&mut self,
|
|
pageservers: &Arc<HashMap<NodeId, Node>>,
|
|
) -> ReconcileNeeded {
|
|
// If there are any ambiguous observed states, and the nodes they refer to are available,
|
|
// we should reconcile to clean them up.
|
|
let mut dirty_observed = false;
|
|
for (node_id, observed_loc) in &self.observed.locations {
|
|
let node = pageservers
|
|
.get(node_id)
|
|
.expect("Nodes may not be removed while referenced");
|
|
if observed_loc.conf.is_none() && node.is_available() {
|
|
dirty_observed = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
let active_nodes_dirty = self.dirty(pageservers);
|
|
|
|
let reconcile_needed = match (
|
|
active_nodes_dirty,
|
|
dirty_observed,
|
|
self.pending_compute_notification,
|
|
) {
|
|
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
|
|
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
|
|
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
|
|
_ => ReconcileNeeded::No,
|
|
};
|
|
|
|
if matches!(reconcile_needed, ReconcileNeeded::No) {
|
|
tracing::debug!("Not dirty, no reconciliation needed.");
|
|
return ReconcileNeeded::No;
|
|
}
|
|
|
|
// If we are currently splitting, then never start a reconciler task: the splitting logic
|
|
// requires that shards are not interfered with while it runs. Do this check here rather than
|
|
// up top, so that we only log this message if we would otherwise have done a reconciliation.
|
|
if !matches!(self.splitting, SplitState::Idle) {
|
|
tracing::info!("Refusing to reconcile, splitting in progress");
|
|
return ReconcileNeeded::No;
|
|
}
|
|
|
|
// Reconcile already in flight for the current sequence?
|
|
if let Some(handle) = &self.reconciler {
|
|
if handle.sequence == self.sequence {
|
|
tracing::info!(
|
|
"Reconciliation already in progress for sequence {:?}",
|
|
self.sequence,
|
|
);
|
|
return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Pre-checks done: finally check whether we may actually do the work
|
|
match self.scheduling_policy {
|
|
ShardSchedulingPolicy::Active
|
|
| ShardSchedulingPolicy::Essential
|
|
| ShardSchedulingPolicy::Pause => {}
|
|
ShardSchedulingPolicy::Stop => {
|
|
// We only reach this point if there is work to do and we're going to skip
|
|
// doing it: warn it obvious why this tenant isn't doing what it ought to.
|
|
tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
|
|
return ReconcileNeeded::No;
|
|
}
|
|
}
|
|
|
|
reconcile_needed
|
|
}
|
|
|
|
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
|
|
/// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
|
|
///
|
|
/// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
|
|
/// that the waiter will not complete until some future Reconciler is constructed and run.
|
|
fn ensure_sequence_ahead(&mut self) {
|
|
// Find the highest sequence for which a Reconciler has previously run or is currently
|
|
// running
|
|
let max_seen = std::cmp::max(
|
|
self.reconciler
|
|
.as_ref()
|
|
.map(|r| r.sequence)
|
|
.unwrap_or(Sequence(0)),
|
|
std::cmp::max(self.waiter.load(), self.error_waiter.load()),
|
|
);
|
|
|
|
if self.sequence <= max_seen {
|
|
self.sequence = max_seen.next();
|
|
}
|
|
}
|
|
|
|
/// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
|
|
///
|
|
/// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
|
|
/// you would like to wait on the next reconciler that gets spawned in the background.
|
|
pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
|
|
self.ensure_sequence_ahead();
|
|
|
|
ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
}
|
|
}
|
|
|
|
async fn reconcile(
|
|
sequence: Sequence,
|
|
mut reconciler: Reconciler,
|
|
must_notify: bool,
|
|
) -> ReconcileResult {
|
|
// Attempt to make observed state match intent state
|
|
let result = reconciler.reconcile().await;
|
|
|
|
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
|
// of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
|
|
// sending a notification of a location that isn't really attached.
|
|
if result.is_ok() && must_notify {
|
|
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
|
reconciler.compute_notify().await.ok();
|
|
} else if must_notify {
|
|
// Carry this flag so that the reconciler's result will indicate that it still needs to retry
|
|
// the compute hook notification eventually.
|
|
reconciler.compute_notify_failure = true;
|
|
}
|
|
|
|
// Update result counter
|
|
let outcome_label = match &result {
|
|
Ok(_) => ReconcileOutcome::Success,
|
|
Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
|
|
Err(_) => ReconcileOutcome::Error,
|
|
};
|
|
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_complete
|
|
.inc(ReconcileCompleteLabelGroup {
|
|
status: outcome_label,
|
|
});
|
|
|
|
// Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
|
|
// try and schedule more work in response to our result.
|
|
ReconcileResult {
|
|
sequence,
|
|
result,
|
|
tenant_shard_id: reconciler.tenant_shard_id,
|
|
generation: reconciler.generation,
|
|
observed_deltas: reconciler.observed_deltas(),
|
|
pending_compute_notification: reconciler.compute_notify_failure,
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn spawn_reconciler(
|
|
&mut self,
|
|
reason: ReconcileReason,
|
|
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
|
pageservers: &Arc<HashMap<NodeId, Node>>,
|
|
compute_hook: &Arc<ComputeHook>,
|
|
reconciler_config: ReconcilerConfig,
|
|
service_config: &service::Config,
|
|
persistence: &Arc<Persistence>,
|
|
units: ReconcileUnits,
|
|
gate_guard: GateGuard,
|
|
cancel: &CancellationToken,
|
|
http_client: reqwest::Client,
|
|
) -> Option<ReconcilerWaiter> {
|
|
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
|
|
// doing our sequence's work.
|
|
let old_handle = self.reconciler.take();
|
|
|
|
// Build list of nodes from which the reconciler should detach
|
|
let mut detach = Vec::new();
|
|
for node_id in self.observed.locations.keys() {
|
|
if self.intent.get_attached() != &Some(*node_id)
|
|
&& !self.intent.secondary.contains(node_id)
|
|
{
|
|
detach.push(
|
|
pageservers
|
|
.get(node_id)
|
|
.expect("Intent references non-existent pageserver")
|
|
.clone(),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Advance the sequence before spawning a reconciler, so that sequence waiters
|
|
// can distinguish between before+after the reconcile completes.
|
|
self.ensure_sequence_ahead();
|
|
|
|
let reconciler_cancel = cancel.child_token();
|
|
let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
|
|
let reconciler = Reconciler {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
shard: self.shard,
|
|
placement_policy: self.policy.clone(),
|
|
generation: self.generation,
|
|
intent: reconciler_intent,
|
|
detach,
|
|
reconciler_config,
|
|
config: self.config.clone(),
|
|
preferred_az: self.intent.preferred_az_id.clone(),
|
|
observed: self.observed.clone(),
|
|
original_observed: self.observed.clone(),
|
|
compute_hook: compute_hook.clone(),
|
|
service_config: service_config.clone(),
|
|
_gate_guard: gate_guard,
|
|
_resource_units: units,
|
|
cancel: reconciler_cancel.clone(),
|
|
persistence: persistence.clone(),
|
|
compute_notify_failure: false,
|
|
http_client,
|
|
};
|
|
|
|
let reconcile_seq = self.sequence;
|
|
let long_reconcile_threshold = service_config.long_reconcile_threshold;
|
|
|
|
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
|
|
let must_notify = self.pending_compute_notification;
|
|
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
|
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
|
shard_id=%reconciler.tenant_shard_id.shard_slug());
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_spawn
|
|
.inc();
|
|
let result_tx = result_tx.clone();
|
|
let join_handle = tokio::task::spawn(
|
|
async move {
|
|
// Wait for any previous reconcile task to complete before we start
|
|
if let Some(old_handle) = old_handle {
|
|
old_handle.cancel.cancel();
|
|
if let Err(e) = old_handle.handle.await {
|
|
// We can't do much with this other than log it: the task is done, so
|
|
// we may proceed with our work.
|
|
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
|
|
}
|
|
}
|
|
|
|
// Early check for cancellation before doing any work
|
|
// TODO: wrap all remote API operations in cancellation check
|
|
// as well.
|
|
if reconciler.cancel.is_cancelled() {
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_complete
|
|
.inc(ReconcileCompleteLabelGroup {
|
|
status: ReconcileOutcome::Cancel,
|
|
});
|
|
return;
|
|
}
|
|
|
|
let (tenant_id_label, shard_number_label, sequence_label) = {
|
|
(
|
|
reconciler.tenant_shard_id.tenant_id.to_string(),
|
|
reconciler.tenant_shard_id.shard_number.0.to_string(),
|
|
reconcile_seq.to_string(),
|
|
)
|
|
};
|
|
|
|
let label_group = ReconcileLongRunningLabelGroup {
|
|
tenant_id: &tenant_id_label,
|
|
shard_number: &shard_number_label,
|
|
sequence: &sequence_label,
|
|
};
|
|
|
|
let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
|
|
let long_reconcile_fut = {
|
|
let label_group = label_group.clone();
|
|
async move {
|
|
tokio::time::sleep(long_reconcile_threshold).await;
|
|
|
|
tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");
|
|
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_long_running
|
|
.inc(label_group);
|
|
}
|
|
};
|
|
|
|
let reconcile_fut = std::pin::pin!(reconcile_fut);
|
|
let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);
|
|
|
|
let (was_long, result) =
|
|
match future::select(reconcile_fut, long_reconcile_fut).await {
|
|
Either::Left((reconcile_result, _)) => (false, reconcile_result),
|
|
Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
|
|
};
|
|
|
|
if was_long {
|
|
let id = metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_long_running
|
|
.with_labels(label_group);
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_long_running
|
|
.remove_metric(id);
|
|
}
|
|
|
|
result_tx
|
|
.send(ReconcileResultRequest::ReconcileResult(result))
|
|
.ok();
|
|
}
|
|
.instrument(reconciler_span),
|
|
);
|
|
|
|
self.reconciler = Some(ReconcilerHandle {
|
|
sequence: self.sequence,
|
|
handle: join_handle,
|
|
cancel: reconciler_cancel,
|
|
});
|
|
|
|
Some(ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
})
|
|
}
|
|
|
|
pub(crate) fn cancel_reconciler(&self) {
|
|
if let Some(handle) = self.reconciler.as_ref() {
|
|
handle.cancel.cancel()
|
|
}
|
|
}
|
|
|
|
/// Get a waiter for any reconciliation in flight, but do not start reconciliation
|
|
/// if it is not already running
|
|
pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
|
|
if self.reconciler.is_some() {
|
|
Some(ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Called when a ReconcileResult has been emitted and the service is updating
|
|
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
|
|
/// the handle to indicate there is no longer a reconciliation in progress.
|
|
pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
|
|
if let Some(reconcile_handle) = &self.reconciler {
|
|
if reconcile_handle.sequence <= sequence {
|
|
self.reconciler = None;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// If we had any state at all referring to this node ID, drop it. Does not
|
|
/// attempt to reschedule.
|
|
///
|
|
/// Returns true if we modified the node's intent state.
|
|
pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
|
|
let mut intent_modified = false;
|
|
|
|
// Drop if this node was our attached intent
|
|
if self.intent.attached == Some(node_id) {
|
|
self.intent.attached = None;
|
|
intent_modified = true;
|
|
}
|
|
|
|
// Drop from the list of secondaries, and check if we modified it
|
|
let had_secondaries = self.intent.secondary.len();
|
|
self.intent.secondary.retain(|n| n != &node_id);
|
|
intent_modified |= self.intent.secondary.len() != had_secondaries;
|
|
|
|
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
|
|
|
|
if self.preferred_node == Some(node_id) {
|
|
self.preferred_node = None;
|
|
}
|
|
|
|
intent_modified
|
|
}
|
|
|
|
pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
|
|
self.scheduling_policy = p;
|
|
}
|
|
|
|
pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
|
|
self.scheduling_policy
|
|
}
|
|
|
|
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
|
|
// Ordering: always set last_error before advancing sequence, so that sequence
|
|
// waiters are guaranteed to see a Some value when they see an error.
|
|
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
|
|
self.error_waiter.advance(sequence);
|
|
}
|
|
|
|
pub(crate) fn from_persistent(
|
|
tsp: TenantShardPersistence,
|
|
intent: IntentState,
|
|
) -> anyhow::Result<Self> {
|
|
let tenant_shard_id = tsp.get_tenant_shard_id()?;
|
|
let shard_identity = tsp.get_shard_identity()?;
|
|
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_tenant_shards
|
|
.inc();
|
|
|
|
Ok(Self {
|
|
tenant_shard_id,
|
|
shard: shard_identity,
|
|
sequence: Sequence::initial(),
|
|
generation: tsp.generation.map(|g| Generation::new(g as u32)),
|
|
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
|
|
intent,
|
|
observed: ObservedState::new(),
|
|
config: serde_json::from_str(&tsp.config).unwrap(),
|
|
reconciler: None,
|
|
splitting: tsp.splitting,
|
|
// Filled in during [`Service::startup_reconcile`]
|
|
importing: TimelineImportState::Idle,
|
|
waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
|
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
|
last_error: Arc::default(),
|
|
pending_compute_notification: false,
|
|
delayed_reconcile: false,
|
|
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
|
|
preferred_node: None,
|
|
})
|
|
}
|
|
|
|
pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
|
|
TenantShardPersistence {
|
|
tenant_id: self.tenant_shard_id.tenant_id.to_string(),
|
|
shard_number: self.tenant_shard_id.shard_number.0 as i32,
|
|
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
|
|
shard_stripe_size: self.shard.stripe_size.0 as i32,
|
|
generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
|
|
generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
|
|
placement_policy: serde_json::to_string(&self.policy).unwrap(),
|
|
config: serde_json::to_string(&self.config).unwrap(),
|
|
splitting: SplitState::default(),
|
|
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
|
|
preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
|
|
self.intent.get_preferred_az()
|
|
}
|
|
|
|
pub(crate) fn set_preferred_az(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
preferred_az_id: Option<AvailabilityZone>,
|
|
) {
|
|
self.intent.set_preferred_az(scheduler, preferred_az_id);
|
|
}
|
|
|
|
/// Returns all the nodes to which this tenant shard is attached according to the
|
|
/// observed state and the generations. Return vector is sorted from latest generation
|
|
/// to earliest.
|
|
pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
|
|
self.observed
|
|
.locations
|
|
.iter()
|
|
.filter_map(|(node_id, observed)| {
|
|
use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
|
|
|
|
let conf = observed.conf.as_ref()?;
|
|
|
|
match (conf.generation, conf.mode) {
|
|
(Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
|
|
Some((*node_id, gen_))
|
|
}
|
|
_ => None,
|
|
}
|
|
})
|
|
.sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
|
|
lhs_gen.cmp(rhs_gen).reverse()
|
|
})
|
|
.map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
|
|
.collect()
|
|
}
|
|
|
|
/// Update the observed state of the tenant by applying incremental deltas
|
|
///
|
|
/// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
|
|
/// They are then filtered in [`crate::service::Service::process_result`].
|
|
pub(crate) fn apply_observed_deltas(
|
|
&mut self,
|
|
deltas: impl Iterator<Item = ObservedStateDelta>,
|
|
) {
|
|
for delta in deltas {
|
|
match delta {
|
|
ObservedStateDelta::Upsert(ups) => {
|
|
let (node_id, loc) = *ups;
|
|
|
|
// If the generation of the observed location in the delta is lagging
|
|
// behind the current one, then we have a race condition and cannot
|
|
// be certain about the true observed state. Set the observed state
|
|
// to None in order to reflect this.
|
|
let crnt_gen = self
|
|
.observed
|
|
.locations
|
|
.get(&node_id)
|
|
.and_then(|loc| loc.conf.as_ref())
|
|
.and_then(|conf| conf.generation);
|
|
let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
|
|
match (crnt_gen, new_gen) {
|
|
(Some(crnt), Some(new)) if crnt_gen > new_gen => {
|
|
tracing::warn!(
|
|
"Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
|
|
node_id,
|
|
loc,
|
|
crnt,
|
|
new
|
|
);
|
|
|
|
self.observed
|
|
.locations
|
|
.insert(node_id, ObservedStateLocation { conf: None });
|
|
|
|
continue;
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
if let Some(conf) = &loc.conf {
|
|
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
|
} else {
|
|
tracing::info!("Setting observed location {} to None", node_id,)
|
|
}
|
|
|
|
self.observed.locations.insert(node_id, loc);
|
|
}
|
|
ObservedStateDelta::Delete(node_id) => {
|
|
tracing::info!("Deleting observed location {}", node_id);
|
|
self.observed.locations.remove(&node_id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
|
|
///
|
|
/// If the shard does not have a preferred AZ, returns false.
|
|
pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
|
|
self.intent
|
|
.get_attached()
|
|
.map(|node_id| {
|
|
Some(
|
|
nodes
|
|
.get(&node_id)
|
|
.expect("referenced node exists")
|
|
.get_availability_zone_id(),
|
|
) != self.intent.preferred_az_id.as_ref()
|
|
})
|
|
.unwrap_or(false)
|
|
}
|
|
}
|
|
|
|
impl Drop for TenantShard {
|
|
fn drop(&mut self) {
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_tenant_shards
|
|
.dec();
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub(crate) mod tests {
|
|
use std::cell::RefCell;
|
|
use std::rc::Rc;
|
|
|
|
use pageserver_api::controller_api::NodeAvailability;
|
|
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
|
|
use rand::SeedableRng;
|
|
use rand::rngs::StdRng;
|
|
use utils::id::TenantId;
|
|
|
|
use super::*;
|
|
use crate::scheduler::test_utils::make_test_nodes;
|
|
|
|
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
|
|
let tenant_id = TenantId::generate();
|
|
let shard_number = ShardNumber(0);
|
|
let shard_count = ShardCount::new(1);
|
|
let stripe_size = DEFAULT_STRIPE_SIZE;
|
|
|
|
let tenant_shard_id = TenantShardId {
|
|
tenant_id,
|
|
shard_number,
|
|
shard_count,
|
|
};
|
|
TenantShard::new(
|
|
tenant_shard_id,
|
|
ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
|
|
policy,
|
|
None,
|
|
)
|
|
}
|
|
|
|
pub(crate) fn make_test_tenant(
|
|
policy: PlacementPolicy,
|
|
shard_count: ShardCount,
|
|
preferred_az: Option<AvailabilityZone>,
|
|
) -> Vec<TenantShard> {
|
|
make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
|
|
}
|
|
|
|
pub(crate) fn make_test_tenant_with_id(
|
|
tenant_id: TenantId,
|
|
policy: PlacementPolicy,
|
|
shard_count: ShardCount,
|
|
preferred_az: Option<AvailabilityZone>,
|
|
) -> Vec<TenantShard> {
|
|
let stripe_size = DEFAULT_STRIPE_SIZE;
|
|
(0..shard_count.count())
|
|
.map(|i| {
|
|
let shard_number = ShardNumber(i);
|
|
|
|
let tenant_shard_id = TenantShardId {
|
|
tenant_id,
|
|
shard_number,
|
|
shard_count,
|
|
};
|
|
TenantShard::new(
|
|
tenant_shard_id,
|
|
ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
|
|
policy.clone(),
|
|
preferred_az.clone(),
|
|
)
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Test the scheduling behaviors used when a tenant configured for HA is subject
|
|
/// to nodes being marked offline.
|
|
#[test]
|
|
fn tenant_ha_scheduling() -> anyhow::Result<()> {
|
|
// Start with three nodes. Our tenant will only use two. The third one is
|
|
// expected to remain unused.
|
|
let mut nodes = make_test_nodes(3, &[]);
|
|
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
let mut context = ScheduleContext::default();
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("we have enough nodes, scheduling should work");
|
|
|
|
// Expect to initially be schedule on to different nodes
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 1);
|
|
assert!(tenant_shard.intent.attached.is_some());
|
|
|
|
let attached_node_id = tenant_shard.intent.attached.unwrap();
|
|
let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
|
|
assert_ne!(attached_node_id, secondary_node_id);
|
|
|
|
// Notifying the attached node is offline should demote it to a secondary
|
|
let changed = tenant_shard
|
|
.intent
|
|
.demote_attached(&mut scheduler, attached_node_id);
|
|
assert!(changed);
|
|
assert!(tenant_shard.intent.attached.is_none());
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 2);
|
|
|
|
// Update the scheduler state to indicate the node is offline
|
|
nodes
|
|
.get_mut(&attached_node_id)
|
|
.unwrap()
|
|
.set_availability(NodeAvailability::Offline);
|
|
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
|
|
|
|
// Scheduling the node should promote the still-available secondary node to attached
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("active nodes are available");
|
|
assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
|
|
|
|
// The original attached node should have been retained as a secondary
|
|
assert_eq!(
|
|
*tenant_shard.intent.secondary.iter().last().unwrap(),
|
|
attached_node_id
|
|
);
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn intent_from_observed() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(3, &[]);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
tenant_shard.observed.locations.insert(
|
|
NodeId(3),
|
|
ObservedStateLocation {
|
|
conf: Some(LocationConfig {
|
|
mode: LocationConfigMode::AttachedMulti,
|
|
generation: Some(2),
|
|
secondary_conf: None,
|
|
shard_number: tenant_shard.shard.number.0,
|
|
shard_count: tenant_shard.shard.count.literal(),
|
|
shard_stripe_size: tenant_shard.shard.stripe_size.0,
|
|
tenant_conf: TenantConfig::default(),
|
|
}),
|
|
},
|
|
);
|
|
|
|
tenant_shard.observed.locations.insert(
|
|
NodeId(2),
|
|
ObservedStateLocation {
|
|
conf: Some(LocationConfig {
|
|
mode: LocationConfigMode::AttachedStale,
|
|
generation: Some(1),
|
|
secondary_conf: None,
|
|
shard_number: tenant_shard.shard.number.0,
|
|
shard_count: tenant_shard.shard.count.literal(),
|
|
shard_stripe_size: tenant_shard.shard.stripe_size.0,
|
|
tenant_conf: TenantConfig::default(),
|
|
}),
|
|
},
|
|
);
|
|
|
|
tenant_shard.intent_from_observed(&mut scheduler);
|
|
|
|
// The highest generationed attached location gets used as attached
|
|
assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
|
|
// Other locations get used as secondary
|
|
assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
|
|
|
|
scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn scheduling_mode() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(3, &[]);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
// In pause mode, schedule() shouldn't do anything
|
|
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
|
|
assert!(
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
|
.is_ok()
|
|
);
|
|
assert!(tenant_shard.intent.all_pageservers().is_empty());
|
|
|
|
// In active mode, schedule() works
|
|
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
|
|
assert!(
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
|
.is_ok()
|
|
);
|
|
assert!(!tenant_shard.intent.all_pageservers().is_empty());
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
/// Simple case: moving attachment to somewhere better where we already have a secondary
|
|
fn optimize_attachment_simple() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(
|
|
3,
|
|
&[
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-c".to_string()),
|
|
],
|
|
);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
|
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
|
|
|
// Initially: both nodes attached on shard 1, and both have secondary locations
|
|
// on different nodes.
|
|
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
|
|
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
|
|
|
|
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
|
|
let mut schedule_context = ScheduleContext::default();
|
|
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
|
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
|
schedule_context
|
|
}
|
|
|
|
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: NodeId(2),
|
|
new_attached_node_id: NodeId(1)
|
|
})
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
|
|
|
// // Either shard should recognize that it has the option to switch to a secondary location where there
|
|
// // would be no other shards from the same tenant, and request to do so.
|
|
// assert_eq!(
|
|
// optimization_a_prepare,
|
|
// Some(ScheduleOptimization {
|
|
// sequence: shard_a.sequence,
|
|
// action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
|
|
// })
|
|
// );
|
|
// shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
|
|
|
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
// let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
// assert_eq!(
|
|
// optimization_a_migrate,
|
|
// Some(ScheduleOptimization {
|
|
// sequence: shard_a.sequence,
|
|
// action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
// old_attached_node_id: NodeId(1),
|
|
// new_attached_node_id: NodeId(2)
|
|
// })
|
|
// })
|
|
// );
|
|
// shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
|
|
|
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
// let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
// assert_eq!(
|
|
// optimization_a_cleanup,
|
|
// Some(ScheduleOptimization {
|
|
// sequence: shard_a.sequence,
|
|
// action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
|
|
// })
|
|
// );
|
|
// shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
|
|
|
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
|
|
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
shard_b.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
/// Complicated case: moving attachment to somewhere better where we do not have a secondary
|
|
/// already, creating one as needed.
|
|
fn optimize_attachment_multistep() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(
|
|
3,
|
|
&[
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-c".to_string()),
|
|
],
|
|
);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
// Two shards of a tenant that wants to be in AZ A
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
|
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
|
|
|
// Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
|
|
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
|
|
shard_b.intent.push_secondary(&mut scheduler, NodeId(2));
|
|
|
|
fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
|
|
let mut schedule_context = ScheduleContext::default();
|
|
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
|
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
|
schedule_context
|
|
}
|
|
|
|
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_prepare,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
|
|
|
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_migrate,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: NodeId(2),
|
|
new_attached_node_id: NodeId(1)
|
|
})
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
|
|
|
let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_cleanup,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
|
|
|
// // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
|
|
// let schedule_context = make_schedule_context(&shard_a, &shard_b);
|
|
// assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
shard_b.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
/// How the optimisation code handles a shard with a preferred node set; this is an example
|
|
/// of the multi-step migration, but driven by a different input.
|
|
fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(
|
|
4,
|
|
&[
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
],
|
|
);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
// Two shards of a tenant that wants to be in AZ A
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
|
|
|
|
// Initially attached in a stable location
|
|
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
|
|
// Set the preferred node to node 2, an equally high scoring node to its current location
|
|
shard_a.preferred_node = Some(NodeId(2));
|
|
|
|
fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
|
|
let mut schedule_context = ScheduleContext::default();
|
|
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
|
schedule_context
|
|
}
|
|
|
|
let schedule_context = make_schedule_context(&shard_a);
|
|
let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_prepare,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
|
|
|
// The first step of the optimisation should not have cleared the preferred node
|
|
assert_eq!(shard_a.preferred_node, Some(NodeId(2)));
|
|
|
|
let schedule_context = make_schedule_context(&shard_a);
|
|
let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_migrate,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: NodeId(1),
|
|
new_attached_node_id: NodeId(2)
|
|
})
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
|
|
|
// The cutover step of the optimisation should have cleared the preferred node
|
|
assert_eq!(shard_a.preferred_node, None);
|
|
|
|
let schedule_context = make_schedule_context(&shard_a);
|
|
let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_cleanup,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
/// Check that multi-step migration works when moving to somewhere that is only better by
|
|
/// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
|
|
/// counting toward the affinity score such that it prevents the rest of the migration from happening.
|
|
fn optimize_attachment_marginal() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(2, &[]);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
// Multi-sharded tenant, we will craft a situation where affinity
|
|
// scores differ only slightly
|
|
let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);
|
|
|
|
// 1 attached on node 1
|
|
shards[0]
|
|
.intent
|
|
.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
// 3 attached on node 2
|
|
shards[1]
|
|
.intent
|
|
.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
shards[2]
|
|
.intent
|
|
.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
shards[3]
|
|
.intent
|
|
.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
|
|
// The scheduler should figure out that we need to:
|
|
// - Create a secondary for shard 3 on node 1
|
|
// - Migrate shard 3 to node 1
|
|
// - Remove shard 3's location on node 2
|
|
|
|
fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
|
|
let mut schedule_context = ScheduleContext::default();
|
|
for shard in shards {
|
|
schedule_context.avoid(&shard.intent.all_pageservers());
|
|
}
|
|
schedule_context
|
|
}
|
|
|
|
let schedule_context = make_schedule_context(&shards);
|
|
let optimization_a_prepare =
|
|
shards[1].optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_prepare,
|
|
Some(ScheduleOptimization {
|
|
sequence: shards[1].sequence,
|
|
action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
|
|
})
|
|
);
|
|
shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());
|
|
|
|
let schedule_context = make_schedule_context(&shards);
|
|
let optimization_a_migrate =
|
|
shards[1].optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_migrate,
|
|
Some(ScheduleOptimization {
|
|
sequence: shards[1].sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: NodeId(2),
|
|
new_attached_node_id: NodeId(1)
|
|
})
|
|
})
|
|
);
|
|
shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());
|
|
|
|
let schedule_context = make_schedule_context(&shards);
|
|
let optimization_a_cleanup =
|
|
shards[1].optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization_a_cleanup,
|
|
Some(ScheduleOptimization {
|
|
sequence: shards[1].sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
|
|
})
|
|
);
|
|
shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());
|
|
|
|
// Everything should be stable now
|
|
let schedule_context = make_schedule_context(&shards);
|
|
assert_eq!(
|
|
shards[0].optimize_attachment(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
shards[1].optimize_attachment(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
shards[2].optimize_attachment(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
shards[3].optimize_attachment(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
|
|
for mut shard in shards {
|
|
shard.intent.clear(&mut scheduler);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn optimize_secondary() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(4, &[]);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
// Initially: both nodes attached on shard 1, and both have secondary locations
|
|
// on different nodes.
|
|
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
|
|
let mut schedule_context = ScheduleContext::default();
|
|
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
|
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
|
|
|
let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);
|
|
|
|
// Since there is a node with no locations available, the node with two locations for the
|
|
// same tenant should generate an optimization to move one away
|
|
assert_eq!(
|
|
optimization_a,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
|
old_node_id: NodeId(3),
|
|
new_node_id: NodeId(4)
|
|
})
|
|
})
|
|
);
|
|
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
|
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
|
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
shard_b.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Test how the optimisation code behaves with an extra secondary
|
|
#[test]
|
|
fn optimize_removes_secondary() -> anyhow::Result<()> {
|
|
let az_a_tag = AvailabilityZone("az-a".to_string());
|
|
let az_b_tag = AvailabilityZone("az-b".to_string());
|
|
let mut nodes = make_test_nodes(
|
|
4,
|
|
&[
|
|
az_a_tag.clone(),
|
|
az_b_tag.clone(),
|
|
az_a_tag.clone(),
|
|
az_b_tag.clone(),
|
|
],
|
|
);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut schedule_context = ScheduleContext::default();
|
|
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
|
|
shard_a
|
|
.schedule(&mut scheduler, &mut schedule_context)
|
|
.unwrap();
|
|
|
|
// Attached on node 1, secondary on node 2
|
|
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
|
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);
|
|
|
|
// Initially optimiser is idle
|
|
assert_eq!(
|
|
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
|
|
// A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
|
|
// to our new location
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
|
|
|
// A spare secondary in the non-home AZ, and one of them is offline
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
|
|
nodes
|
|
.get_mut(&NodeId(4))
|
|
.unwrap()
|
|
.set_availability(NodeAvailability::Offline);
|
|
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
|
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
|
|
|
// A spare secondary when should have none
|
|
shard_a.policy = PlacementPolicy::Attached(0);
|
|
let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
|
|
assert_eq!(
|
|
optimization,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
|
|
})
|
|
);
|
|
shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
|
|
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
|
assert_eq!(shard_a.intent.get_secondary(), &vec![]);
|
|
|
|
// Check that in secondary mode, we preserve the secondary in the preferred AZ
|
|
let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
|
|
shard_a.policy = PlacementPolicy::Secondary;
|
|
shard_a
|
|
.schedule(&mut scheduler, &mut schedule_context)
|
|
.unwrap();
|
|
assert_eq!(shard_a.intent.get_attached(), &None);
|
|
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
|
assert_eq!(
|
|
shard_a.optimize_attachment(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
shard_a.optimize_secondary(&mut scheduler, &schedule_context),
|
|
None
|
|
);
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
|
// called repeatedly in the background.
|
|
// Returns the applied optimizations
|
|
fn optimize_til_idle(
|
|
scheduler: &mut Scheduler,
|
|
shards: &mut [TenantShard],
|
|
) -> Vec<ScheduleOptimization> {
|
|
let mut loop_n = 0;
|
|
let mut optimizations = Vec::default();
|
|
loop {
|
|
let mut schedule_context = ScheduleContext::default();
|
|
let mut any_changed = false;
|
|
|
|
for shard in shards.iter() {
|
|
schedule_context.avoid(&shard.intent.all_pageservers());
|
|
}
|
|
|
|
for shard in shards.iter_mut() {
|
|
let optimization = shard.optimize_attachment(scheduler, &schedule_context);
|
|
tracing::info!(
|
|
"optimize_attachment({})={:?}",
|
|
shard.tenant_shard_id,
|
|
optimization
|
|
);
|
|
if let Some(optimization) = optimization {
|
|
// Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
|
|
assert!(shard.maybe_optimizable(scheduler, &schedule_context));
|
|
optimizations.push(optimization.clone());
|
|
shard.apply_optimization(scheduler, optimization);
|
|
any_changed = true;
|
|
break;
|
|
}
|
|
|
|
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
|
|
tracing::info!(
|
|
"optimize_secondary({})={:?}",
|
|
shard.tenant_shard_id,
|
|
optimization
|
|
);
|
|
if let Some(optimization) = optimization {
|
|
// Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
|
|
assert!(shard.maybe_optimizable(scheduler, &schedule_context));
|
|
|
|
optimizations.push(optimization.clone());
|
|
shard.apply_optimization(scheduler, optimization);
|
|
any_changed = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if !any_changed {
|
|
break;
|
|
}
|
|
|
|
// Assert no infinite loop
|
|
loop_n += 1;
|
|
assert!(loop_n < 1000);
|
|
}
|
|
|
|
optimizations
|
|
}
|
|
|
|
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
|
|
/// that it converges.
|
|
#[test]
|
|
fn optimize_add_nodes() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(
|
|
9,
|
|
&[
|
|
// Initial 6 nodes
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-c".to_string()),
|
|
AvailabilityZone("az-c".to_string()),
|
|
// Three we will add later
|
|
AvailabilityZone("az-a".to_string()),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-c".to_string()),
|
|
],
|
|
);
|
|
|
|
// Only show the scheduler two nodes in each AZ to start with
|
|
let mut scheduler = Scheduler::new([].iter());
|
|
for i in 1..=6 {
|
|
scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
|
|
}
|
|
|
|
let mut shards = make_test_tenant(
|
|
PlacementPolicy::Attached(1),
|
|
ShardCount::new(4),
|
|
Some(AvailabilityZone("az-a".to_string())),
|
|
);
|
|
let mut schedule_context = ScheduleContext::default();
|
|
for shard in &mut shards {
|
|
assert!(
|
|
shard
|
|
.schedule(&mut scheduler, &mut schedule_context)
|
|
.is_ok()
|
|
);
|
|
}
|
|
|
|
// Initial: attached locations land in the tenant's home AZ.
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
|
|
|
// Initial: secondary locations in a remote AZ
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
|
|
|
|
// Add another three nodes: we should see the shards spread out when their optimize
|
|
// methods are called
|
|
scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
|
|
scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
|
|
scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
|
|
optimize_til_idle(&mut scheduler, &mut shards);
|
|
|
|
// We expect one attached location was moved to the new node in the tenant's home AZ
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
|
|
// The original node has one less attached shard
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
|
|
|
// One of the original nodes still has two attachments, since there are an odd number of nodes
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
|
|
|
// None of our secondaries moved, since we already had enough nodes for those to be
|
|
// scheduled perfectly
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);
|
|
|
|
for shard in shards.iter_mut() {
|
|
shard.intent.clear(&mut scheduler);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Test that initial shard scheduling is optimal. By optimal we mean
|
|
/// that the optimizer cannot find a way to improve it.
|
|
///
|
|
/// This test is an example of the scheduling issue described in
|
|
/// https://github.com/neondatabase/neon/issues/8969
|
|
#[test]
|
|
fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
|
|
use itertools::Itertools;
|
|
|
|
let nodes = make_test_nodes(2, &[]);
|
|
|
|
let mut scheduler = Scheduler::new([].iter());
|
|
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
|
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
|
|
|
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
|
let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
|
|
|
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
|
|
let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
|
|
|
let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
|
|
let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
|
|
|
|
let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
|
|
|
|
for (shard, context) in schedule_order {
|
|
let context = &mut *context.borrow_mut();
|
|
shard.schedule(&mut scheduler, context).unwrap();
|
|
}
|
|
|
|
let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
|
|
assert_eq!(applied_to_a, vec![]);
|
|
|
|
let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
|
|
assert_eq!(applied_to_b, vec![]);
|
|
|
|
for shard in a.iter_mut().chain(b.iter_mut()) {
|
|
shard.intent.clear(&mut scheduler);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn random_az_shard_scheduling() -> anyhow::Result<()> {
|
|
use rand::seq::SliceRandom;
|
|
|
|
for seed in 0..50 {
|
|
eprintln!("Running test with seed {seed}");
|
|
let mut rng = StdRng::seed_from_u64(seed);
|
|
|
|
let az_a_tag = AvailabilityZone("az-a".to_string());
|
|
let az_b_tag = AvailabilityZone("az-b".to_string());
|
|
let azs = [az_a_tag, az_b_tag];
|
|
let nodes = make_test_nodes(4, &azs);
|
|
let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();
|
|
|
|
let mut scheduler = Scheduler::new([].iter());
|
|
for node in nodes.values() {
|
|
scheduler.node_upsert(node);
|
|
}
|
|
|
|
let mut shards = Vec::default();
|
|
let mut contexts = Vec::default();
|
|
let mut az_picker = azs.iter().cycle().cloned();
|
|
for i in 0..100 {
|
|
let az = az_picker.next().unwrap();
|
|
let shard_count = i % 4 + 1;
|
|
*shards_per_az.entry(az.clone()).or_default() += shard_count;
|
|
|
|
let tenant_shards = make_test_tenant(
|
|
PlacementPolicy::Attached(1),
|
|
ShardCount::new(shard_count.try_into().unwrap()),
|
|
Some(az),
|
|
);
|
|
let context = Rc::new(RefCell::new(ScheduleContext::default()));
|
|
|
|
contexts.push(context.clone());
|
|
let with_ctx = tenant_shards
|
|
.into_iter()
|
|
.map(|shard| (shard, context.clone()));
|
|
for shard_with_ctx in with_ctx {
|
|
shards.push(shard_with_ctx);
|
|
}
|
|
}
|
|
|
|
shards.shuffle(&mut rng);
|
|
|
|
#[derive(Default, Debug)]
|
|
struct NodeStats {
|
|
attachments: u32,
|
|
secondaries: u32,
|
|
}
|
|
|
|
let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
|
|
let mut attachments_in_wrong_az = 0;
|
|
let mut secondaries_in_wrong_az = 0;
|
|
|
|
for (shard, context) in &mut shards {
|
|
let context = &mut *context.borrow_mut();
|
|
shard.schedule(&mut scheduler, context).unwrap();
|
|
|
|
let attached_node = shard.intent.get_attached().unwrap();
|
|
let stats = node_stats.entry(attached_node).or_default();
|
|
stats.attachments += 1;
|
|
|
|
let secondary_node = *shard.intent.get_secondary().first().unwrap();
|
|
let stats = node_stats.entry(secondary_node).or_default();
|
|
stats.secondaries += 1;
|
|
|
|
let attached_node_az = nodes
|
|
.get(&attached_node)
|
|
.unwrap()
|
|
.get_availability_zone_id();
|
|
let secondary_node_az = nodes
|
|
.get(&secondary_node)
|
|
.unwrap()
|
|
.get_availability_zone_id();
|
|
let preferred_az = shard.preferred_az().unwrap();
|
|
|
|
if attached_node_az != preferred_az {
|
|
eprintln!(
|
|
"{} attachment was scheduled in AZ {} but preferred AZ {}",
|
|
shard.tenant_shard_id, attached_node_az, preferred_az
|
|
);
|
|
attachments_in_wrong_az += 1;
|
|
}
|
|
|
|
if secondary_node_az == preferred_az {
|
|
eprintln!(
|
|
"{} secondary was scheduled in AZ {} which matches preference",
|
|
shard.tenant_shard_id, attached_node_az
|
|
);
|
|
secondaries_in_wrong_az += 1;
|
|
}
|
|
}
|
|
|
|
let mut violations = Vec::default();
|
|
|
|
if attachments_in_wrong_az > 0 {
|
|
violations.push(format!(
|
|
"{} attachments scheduled to the incorrect AZ",
|
|
attachments_in_wrong_az
|
|
));
|
|
}
|
|
|
|
if secondaries_in_wrong_az > 0 {
|
|
violations.push(format!(
|
|
"{} secondaries scheduled to the incorrect AZ",
|
|
secondaries_in_wrong_az
|
|
));
|
|
}
|
|
|
|
eprintln!(
|
|
"attachments_in_wrong_az={} secondaries_in_wrong_az={}",
|
|
attachments_in_wrong_az, secondaries_in_wrong_az
|
|
);
|
|
|
|
for (node_id, stats) in &node_stats {
|
|
let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
|
|
let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
|
|
let allowed_attachment_load =
|
|
(ideal_attachment_load - 1)..(ideal_attachment_load + 2);
|
|
|
|
if !allowed_attachment_load.contains(&stats.attachments) {
|
|
violations.push(format!(
|
|
"Found {} attachments on node {}, but expected {}",
|
|
stats.attachments, node_id, ideal_attachment_load
|
|
));
|
|
}
|
|
|
|
eprintln!(
|
|
"{}: attachments={} secondaries={} ideal_attachment_load={}",
|
|
node_id, stats.attachments, stats.secondaries, ideal_attachment_load
|
|
);
|
|
}
|
|
|
|
assert!(violations.is_empty(), "{violations:?}");
|
|
|
|
for (mut shard, _ctx) in shards {
|
|
shard.intent.clear(&mut scheduler);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
|
|
#[test]
|
|
fn tenant_secondary_scheduling() -> anyhow::Result<()> {
|
|
let az_a = AvailabilityZone("az-a".to_string());
|
|
let nodes = make_test_nodes(
|
|
3,
|
|
&[
|
|
az_a.clone(),
|
|
AvailabilityZone("az-b".to_string()),
|
|
AvailabilityZone("az-c".to_string()),
|
|
],
|
|
);
|
|
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
let mut context = ScheduleContext::default();
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
|
|
tenant_shard.intent.preferred_az_id = Some(az_a.clone());
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("we have enough nodes, scheduling should work");
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 1);
|
|
assert!(tenant_shard.intent.attached.is_none());
|
|
|
|
// Should have scheduled into the preferred AZ
|
|
assert_eq!(
|
|
scheduler
|
|
.get_node_az(&tenant_shard.intent.secondary[0])
|
|
.as_ref(),
|
|
tenant_shard.preferred_az()
|
|
);
|
|
|
|
// Optimizer should agree
|
|
assert_eq!(
|
|
tenant_shard.optimize_attachment(&mut scheduler, &context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
tenant_shard.optimize_secondary(&mut scheduler, &context),
|
|
None
|
|
);
|
|
|
|
// Switch to PlacementPolicy::Attached
|
|
tenant_shard.policy = PlacementPolicy::Attached(1);
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("we have enough nodes, scheduling should work");
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 1);
|
|
assert!(tenant_shard.intent.attached.is_some());
|
|
// Secondary should now be in non-preferred AZ
|
|
assert_ne!(
|
|
scheduler
|
|
.get_node_az(&tenant_shard.intent.secondary[0])
|
|
.as_ref(),
|
|
tenant_shard.preferred_az()
|
|
);
|
|
// Attached should be in preferred AZ
|
|
assert_eq!(
|
|
scheduler
|
|
.get_node_az(&tenant_shard.intent.attached.unwrap())
|
|
.as_ref(),
|
|
tenant_shard.preferred_az()
|
|
);
|
|
|
|
// Optimizer should agree
|
|
assert_eq!(
|
|
tenant_shard.optimize_attachment(&mut scheduler, &context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
tenant_shard.optimize_secondary(&mut scheduler, &context),
|
|
None
|
|
);
|
|
|
|
// Switch back to PlacementPolicy::Secondary
|
|
tenant_shard.policy = PlacementPolicy::Secondary;
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("we have enough nodes, scheduling should work");
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 1);
|
|
assert!(tenant_shard.intent.attached.is_none());
|
|
// When we picked a location to keep, we should have kept the one in the preferred AZ
|
|
assert_eq!(
|
|
scheduler
|
|
.get_node_az(&tenant_shard.intent.secondary[0])
|
|
.as_ref(),
|
|
tenant_shard.preferred_az()
|
|
);
|
|
|
|
// Optimizer should agree
|
|
assert_eq!(
|
|
tenant_shard.optimize_attachment(&mut scheduler, &context),
|
|
None
|
|
);
|
|
assert_eq!(
|
|
tenant_shard.optimize_secondary(&mut scheduler, &context),
|
|
None
|
|
);
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
}
|