mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
## Problem While adapting the storage controller scale test to do graceful rolling restarts via drain and fill, I noticed that secondaries are also being rescheduled, which, in turn, caused the storage controller to optimise attachments. ## Summary of changes * Introduce a transactional looking rescheduling primitive (i.e. "try to schedule to this secondary, but leave everything as is if you can't") * Use it for the drain and fill stages to avoid calling into `Scheduler::schedule` and having secondaries move around.
1707 lines
68 KiB
Rust
1707 lines
68 KiB
Rust
use std::{
|
|
collections::{HashMap, HashSet},
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use crate::{
|
|
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
|
|
persistence::TenantShardPersistence,
|
|
reconciler::ReconcileUnits,
|
|
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
|
};
|
|
use pageserver_api::controller_api::{
|
|
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
|
};
|
|
use pageserver_api::{
|
|
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
|
shard::{ShardIdentity, TenantShardId},
|
|
};
|
|
use serde::Serialize;
|
|
use tokio::task::JoinHandle;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{instrument, Instrument};
|
|
use utils::{
|
|
generation::Generation,
|
|
id::NodeId,
|
|
seqwait::{SeqWait, SeqWaitError},
|
|
sync::gate::GateGuard,
|
|
};
|
|
|
|
use crate::{
|
|
compute_hook::ComputeHook,
|
|
node::Node,
|
|
persistence::{split_state::SplitState, Persistence},
|
|
reconciler::{
|
|
attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
|
|
},
|
|
scheduler::{ScheduleError, Scheduler},
|
|
service, Sequence,
|
|
};
|
|
|
|
/// Serialization helper
|
|
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::ser::Serializer,
|
|
T: std::fmt::Display,
|
|
{
|
|
serializer.collect_str(
|
|
&v.lock()
|
|
.unwrap()
|
|
.as_ref()
|
|
.map(|e| format!("{e}"))
|
|
.unwrap_or("".to_string()),
|
|
)
|
|
}
|
|
|
|
/// In-memory state for a particular tenant shard.
|
|
///
|
|
/// This struct implement Serialize for debugging purposes, but is _not_ persisted
|
|
/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
|
|
#[derive(Serialize)]
|
|
pub(crate) struct TenantShard {
|
|
pub(crate) tenant_shard_id: TenantShardId,
|
|
|
|
pub(crate) shard: ShardIdentity,
|
|
|
|
// Runtime only: sequence used to coordinate when updating this object while
|
|
// with background reconcilers may be running. A reconciler runs to a particular
|
|
// sequence.
|
|
pub(crate) sequence: Sequence,
|
|
|
|
// Latest generation number: next time we attach, increment this
|
|
// and use the incremented number when attaching.
|
|
//
|
|
// None represents an incompletely onboarded tenant via the [`Service::location_config`]
|
|
// API, where this tenant may only run in PlacementPolicy::Secondary.
|
|
pub(crate) generation: Option<Generation>,
|
|
|
|
// High level description of how the tenant should be set up. Provided
|
|
// externally.
|
|
pub(crate) policy: PlacementPolicy,
|
|
|
|
// Low level description of exactly which pageservers should fulfil
|
|
// which role. Generated by `Self::schedule`.
|
|
pub(crate) intent: IntentState,
|
|
|
|
// Low level description of how the tenant is configured on pageservers:
|
|
// if this does not match `Self::intent` then the tenant needs reconciliation
|
|
// with `Self::reconcile`.
|
|
pub(crate) observed: ObservedState,
|
|
|
|
// Tenant configuration, passed through opaquely to the pageserver. Identical
|
|
// for all shards in a tenant.
|
|
pub(crate) config: TenantConfig,
|
|
|
|
/// If a reconcile task is currently in flight, it may be joined here (it is
|
|
/// only safe to join if either the result has been received or the reconciler's
|
|
/// cancellation token has been fired)
|
|
#[serde(skip)]
|
|
pub(crate) reconciler: Option<ReconcilerHandle>,
|
|
|
|
/// If a tenant is being split, then all shards with that TenantId will have a
|
|
/// SplitState set, this acts as a guard against other operations such as background
|
|
/// reconciliation, and timeline creation.
|
|
pub(crate) splitting: SplitState,
|
|
|
|
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
|
|
/// is set. This flag is cleared when the tenant is popped off the delay queue.
|
|
pub(crate) delayed_reconcile: bool,
|
|
|
|
/// Optionally wait for reconciliation to complete up to a particular
|
|
/// sequence number.
|
|
#[serde(skip)]
|
|
pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
|
|
/// Indicates sequence number for which we have encountered an error reconciling. If
|
|
/// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
|
|
/// and callers should stop waiting for `waiter` and propagate the error.
|
|
#[serde(skip)]
|
|
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
|
|
/// The most recent error from a reconcile on this tenant. This is a nested Arc
|
|
/// because:
|
|
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
|
|
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
|
|
/// many waiters for one shard, and the underlying error types are not Clone.
|
|
/// TODO: generalize to an array of recent events
|
|
/// TOOD: use a ArcSwap instead of mutex for faster reads?
|
|
#[serde(serialize_with = "read_last_error")]
|
|
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
|
|
|
/// If we have a pending compute notification that for some reason we weren't able to send,
|
|
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
|
|
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
|
|
/// of state that we publish externally in an eventually consistent way.
|
|
pub(crate) pending_compute_notification: bool,
|
|
|
|
// Support/debug tool: if something is going wrong or flapping with scheduling, this may
|
|
// be set to a non-active state to avoid making changes while the issue is fixed.
|
|
scheduling_policy: ShardSchedulingPolicy,
|
|
}
|
|
|
|
#[derive(Default, Clone, Debug, Serialize)]
|
|
pub(crate) struct IntentState {
|
|
attached: Option<NodeId>,
|
|
secondary: Vec<NodeId>,
|
|
}
|
|
|
|
impl IntentState {
|
|
pub(crate) fn new() -> Self {
|
|
Self {
|
|
attached: None,
|
|
secondary: vec![],
|
|
}
|
|
}
|
|
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
|
|
if let Some(node_id) = node_id {
|
|
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
|
|
}
|
|
Self {
|
|
attached: node_id,
|
|
secondary: vec![],
|
|
}
|
|
}
|
|
|
|
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
|
|
if self.attached != new_attached {
|
|
if let Some(old_attached) = self.attached.take() {
|
|
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
|
|
}
|
|
if let Some(new_attached) = &new_attached {
|
|
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
|
|
}
|
|
self.attached = new_attached;
|
|
}
|
|
}
|
|
|
|
/// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
|
|
/// secondary to attached while maintaining the scheduler's reference counts.
|
|
pub(crate) fn promote_attached(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
promote_secondary: NodeId,
|
|
) {
|
|
// If we call this with a node that isn't in secondary, it would cause incorrect
|
|
// scheduler reference counting, since we assume the node is already referenced as a secondary.
|
|
debug_assert!(self.secondary.contains(&promote_secondary));
|
|
|
|
self.secondary.retain(|n| n != &promote_secondary);
|
|
|
|
let demoted = self.attached;
|
|
self.attached = Some(promote_secondary);
|
|
|
|
scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
|
|
if let Some(demoted) = demoted {
|
|
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
|
|
debug_assert!(!self.secondary.contains(&new_secondary));
|
|
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
|
|
self.secondary.push(new_secondary);
|
|
}
|
|
|
|
/// It is legal to call this with a node that is not currently a secondary: that is a no-op
|
|
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
|
|
let index = self.secondary.iter().position(|n| *n == node_id);
|
|
if let Some(index) = index {
|
|
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
|
|
self.secondary.remove(index);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
|
|
for secondary in self.secondary.drain(..) {
|
|
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
|
|
}
|
|
}
|
|
|
|
/// Remove the last secondary node from the list of secondaries
|
|
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
|
|
if let Some(node_id) = self.secondary.pop() {
|
|
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
|
if let Some(old_attached) = self.attached.take() {
|
|
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
|
|
}
|
|
|
|
self.clear_secondary(scheduler);
|
|
}
|
|
|
|
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
|
let mut result = Vec::new();
|
|
if let Some(p) = self.attached {
|
|
result.push(p)
|
|
}
|
|
|
|
result.extend(self.secondary.iter().copied());
|
|
|
|
result
|
|
}
|
|
|
|
pub(crate) fn get_attached(&self) -> &Option<NodeId> {
|
|
&self.attached
|
|
}
|
|
|
|
pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
|
|
&self.secondary
|
|
}
|
|
|
|
/// If the node is in use as the attached location, demote it into
|
|
/// the list of secondary locations. This is used when a node goes offline,
|
|
/// and we want to use a different node for attachment, but not permanently
|
|
/// forget the location on the offline node.
|
|
///
|
|
/// Returns true if a change was made
|
|
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
|
|
if self.attached == Some(node_id) {
|
|
self.attached = None;
|
|
self.secondary.push(node_id);
|
|
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for IntentState {
|
|
fn drop(&mut self) {
|
|
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
|
|
// We do not check this while panicking, to avoid polluting unit test failures or
|
|
// other assertions with this assertion's output. It's still wrong to leak these,
|
|
// but if we already have a panic then we don't need to independently flag this case.
|
|
if !(std::thread::panicking()) {
|
|
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default, Clone, Serialize)]
|
|
pub(crate) struct ObservedState {
|
|
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
|
|
}
|
|
|
|
/// Our latest knowledge of how this tenant is configured in the outside world.
|
|
///
|
|
/// Meaning:
|
|
/// * No instance of this type exists for a node: we are certain that we have nothing configured on that
|
|
/// node for this shard.
|
|
/// * Instance exists with conf==None: we *might* have some state on that node, but we don't know
|
|
/// what it is (e.g. we failed partway through configuring it)
|
|
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
|
|
/// and that configuration will still be present unless something external interfered.
|
|
#[derive(Clone, Serialize)]
|
|
pub(crate) struct ObservedStateLocation {
|
|
/// If None, it means we do not know the status of this shard's location on this node, but
|
|
/// we know that we might have some state on this node.
|
|
pub(crate) conf: Option<LocationConfig>,
|
|
}
|
|
pub(crate) struct ReconcilerWaiter {
|
|
// For observability purposes, remember the ID of the shard we're
|
|
// waiting for.
|
|
pub(crate) tenant_shard_id: TenantShardId,
|
|
|
|
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
|
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
|
seq: Sequence,
|
|
}
|
|
|
|
pub(crate) enum ReconcilerStatus {
|
|
Done,
|
|
Failed,
|
|
InProgress,
|
|
}
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub(crate) enum ReconcileWaitError {
|
|
#[error("Timeout waiting for shard {0}")]
|
|
Timeout(TenantShardId),
|
|
#[error("shutting down")]
|
|
Shutdown,
|
|
#[error("Reconcile error on shard {0}: {1}")]
|
|
Failed(TenantShardId, Arc<ReconcileError>),
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug)]
|
|
pub(crate) struct ReplaceSecondary {
|
|
old_node_id: NodeId,
|
|
new_node_id: NodeId,
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug)]
|
|
pub(crate) struct MigrateAttachment {
|
|
pub(crate) old_attached_node_id: NodeId,
|
|
pub(crate) new_attached_node_id: NodeId,
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug)]
|
|
pub(crate) enum ScheduleOptimizationAction {
|
|
// Replace one of our secondary locations with a different node
|
|
ReplaceSecondary(ReplaceSecondary),
|
|
// Migrate attachment to an existing secondary location
|
|
MigrateAttachment(MigrateAttachment),
|
|
}
|
|
|
|
#[derive(Eq, PartialEq, Debug)]
|
|
pub(crate) struct ScheduleOptimization {
|
|
// What was the reconcile sequence when we generated this optimization? The optimization
|
|
// should only be applied if the shard's sequence is still at this value, in case other changes
|
|
// happened between planning the optimization and applying it.
|
|
sequence: Sequence,
|
|
|
|
pub(crate) action: ScheduleOptimizationAction,
|
|
}
|
|
|
|
impl ReconcilerWaiter {
|
|
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
|
|
tokio::select! {
|
|
result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
|
|
result.map_err(|e| match e {
|
|
SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
|
|
SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
|
|
})?;
|
|
},
|
|
result = self.error_seq_wait.wait_for(self.seq) => {
|
|
result.map_err(|e| match e {
|
|
SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
|
|
SeqWaitError::Timeout => unreachable!()
|
|
})?;
|
|
|
|
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
|
|
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) fn get_status(&self) -> ReconcilerStatus {
|
|
if self.seq_wait.would_wait_for(self.seq).is_err() {
|
|
ReconcilerStatus::Done
|
|
} else if self.error_seq_wait.would_wait_for(self.seq).is_err() {
|
|
ReconcilerStatus::Failed
|
|
} else {
|
|
ReconcilerStatus::InProgress
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Having spawned a reconciler task, the tenant shard's state will carry enough
|
|
/// information to optionally cancel & await it later.
|
|
pub(crate) struct ReconcilerHandle {
|
|
sequence: Sequence,
|
|
handle: JoinHandle<()>,
|
|
cancel: CancellationToken,
|
|
}
|
|
|
|
pub(crate) enum ReconcileNeeded {
|
|
/// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
|
|
/// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
|
|
No,
|
|
/// shard has a reconciler running, and its intent hasn't changed since that one was
|
|
/// spawned: wait for the existing reconciler rather than spawning a new one.
|
|
WaitExisting(ReconcilerWaiter),
|
|
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
|
|
Yes,
|
|
}
|
|
|
|
/// When a reconcile task completes, it sends this result object
|
|
/// to be applied to the primary TenantShard.
|
|
pub(crate) struct ReconcileResult {
|
|
pub(crate) sequence: Sequence,
|
|
/// On errors, `observed` should be treated as an incompleted description
|
|
/// of state (i.e. any nodes present in the result should override nodes
|
|
/// present in the parent tenant state, but any unmentioned nodes should
|
|
/// not be removed from parent tenant state)
|
|
pub(crate) result: Result<(), ReconcileError>,
|
|
|
|
pub(crate) tenant_shard_id: TenantShardId,
|
|
pub(crate) generation: Option<Generation>,
|
|
pub(crate) observed: ObservedState,
|
|
|
|
/// Set [`TenantShard::pending_compute_notification`] from this flag
|
|
pub(crate) pending_compute_notification: bool,
|
|
}
|
|
|
|
impl ObservedState {
|
|
pub(crate) fn new() -> Self {
|
|
Self {
|
|
locations: HashMap::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TenantShard {
|
|
pub(crate) fn new(
|
|
tenant_shard_id: TenantShardId,
|
|
shard: ShardIdentity,
|
|
policy: PlacementPolicy,
|
|
) -> Self {
|
|
Self {
|
|
tenant_shard_id,
|
|
policy,
|
|
intent: IntentState::default(),
|
|
generation: Some(Generation::new(0)),
|
|
shard,
|
|
observed: ObservedState::default(),
|
|
config: TenantConfig::default(),
|
|
reconciler: None,
|
|
splitting: SplitState::Idle,
|
|
sequence: Sequence(1),
|
|
delayed_reconcile: false,
|
|
waiter: Arc::new(SeqWait::new(Sequence(0))),
|
|
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
|
|
last_error: Arc::default(),
|
|
pending_compute_notification: false,
|
|
scheduling_policy: ShardSchedulingPolicy::default(),
|
|
}
|
|
}
|
|
|
|
/// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
|
|
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
|
|
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
|
|
/// in a way that makes use of any configured locations that already exist in the outside world.
|
|
pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
|
|
// Choose an attached location by filtering observed locations, and then sorting to get the highest
|
|
// generation
|
|
let mut attached_locs = self
|
|
.observed
|
|
.locations
|
|
.iter()
|
|
.filter_map(|(node_id, l)| {
|
|
if let Some(conf) = &l.conf {
|
|
if conf.mode == LocationConfigMode::AttachedMulti
|
|
|| conf.mode == LocationConfigMode::AttachedSingle
|
|
|| conf.mode == LocationConfigMode::AttachedStale
|
|
{
|
|
Some((node_id, conf.generation))
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
attached_locs.sort_by_key(|i| i.1);
|
|
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
|
|
self.intent.set_attached(scheduler, Some(*node_id));
|
|
}
|
|
|
|
// All remaining observed locations generate secondary intents. This includes None
|
|
// observations, as these may well have some local content on disk that is usable (this
|
|
// is an edge case that might occur if we restarted during a migration or other change)
|
|
//
|
|
// We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
|
|
// will take care of promoting one of these secondaries to be attached.
|
|
self.observed.locations.keys().for_each(|node_id| {
|
|
if Some(*node_id) != self.intent.attached {
|
|
self.intent.push_secondary(scheduler, *node_id);
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
|
|
/// attached pageserver for a shard.
|
|
///
|
|
/// Returns whether we modified it, and the NodeId selected.
|
|
fn schedule_attached(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
context: &ScheduleContext,
|
|
) -> Result<(bool, NodeId), ScheduleError> {
|
|
// No work to do if we already have an attached tenant
|
|
if let Some(node_id) = self.intent.attached {
|
|
return Ok((false, node_id));
|
|
}
|
|
|
|
if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
|
|
// Promote a secondary
|
|
tracing::debug!("Promoted secondary {} to attached", promote_secondary);
|
|
self.intent.promote_attached(scheduler, promote_secondary);
|
|
Ok((true, promote_secondary))
|
|
} else {
|
|
// Pick a fresh node: either we had no secondaries or none were schedulable
|
|
let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
|
|
tracing::debug!("Selected {} as attached", node_id);
|
|
self.intent.set_attached(scheduler, Some(node_id));
|
|
Ok((true, node_id))
|
|
}
|
|
}
|
|
|
|
pub(crate) fn schedule(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
context: &mut ScheduleContext,
|
|
) -> Result<(), ScheduleError> {
|
|
let r = self.do_schedule(scheduler, context);
|
|
|
|
context.avoid(&self.intent.all_pageservers());
|
|
if let Some(attached) = self.intent.get_attached() {
|
|
context.push_attached(*attached);
|
|
}
|
|
|
|
r
|
|
}
|
|
|
|
pub(crate) fn do_schedule(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
context: &ScheduleContext,
|
|
) -> Result<(), ScheduleError> {
|
|
// TODO: before scheduling new nodes, check if any existing content in
|
|
// self.intent refers to pageservers that are offline, and pick other
|
|
// pageservers if so.
|
|
|
|
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
|
|
// change their attach location.
|
|
|
|
match self.scheduling_policy {
|
|
ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
|
|
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
|
|
// Warn to make it obvious why other things aren't happening/working, if we skip scheduling
|
|
tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
|
"Scheduling is disabled by policy {:?}", self.scheduling_policy);
|
|
return Ok(());
|
|
}
|
|
}
|
|
|
|
// Build the set of pageservers already in use by this tenant, to avoid scheduling
|
|
// more work on the same pageservers we're already using.
|
|
let mut modified = false;
|
|
|
|
// Add/remove nodes to fulfil policy
|
|
use PlacementPolicy::*;
|
|
match self.policy {
|
|
Attached(secondary_count) => {
|
|
let retain_secondaries = if self.intent.attached.is_none()
|
|
&& scheduler.node_preferred(&self.intent.secondary).is_some()
|
|
{
|
|
// If we have no attached, and one of the secondaries is elegible to be promoted, retain
|
|
// one more secondary than we usually would, as one of them will become attached futher down this function.
|
|
secondary_count + 1
|
|
} else {
|
|
secondary_count
|
|
};
|
|
|
|
while self.intent.secondary.len() > retain_secondaries {
|
|
// We have no particular preference for one secondary location over another: just
|
|
// arbitrarily drop from the end
|
|
self.intent.pop_secondary(scheduler);
|
|
modified = true;
|
|
}
|
|
|
|
// Should have exactly one attached, and N secondaries
|
|
let (modified_attached, attached_node_id) =
|
|
self.schedule_attached(scheduler, context)?;
|
|
modified |= modified_attached;
|
|
|
|
let mut used_pageservers = vec![attached_node_id];
|
|
while self.intent.secondary.len() < secondary_count {
|
|
let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
|
|
self.intent.push_secondary(scheduler, node_id);
|
|
used_pageservers.push(node_id);
|
|
modified = true;
|
|
}
|
|
}
|
|
Secondary => {
|
|
if let Some(node_id) = self.intent.get_attached() {
|
|
// Populate secondary by demoting the attached node
|
|
self.intent.demote_attached(scheduler, *node_id);
|
|
modified = true;
|
|
} else if self.intent.secondary.is_empty() {
|
|
// Populate secondary by scheduling a fresh node
|
|
let node_id = scheduler.schedule_shard(&[], context)?;
|
|
self.intent.push_secondary(scheduler, node_id);
|
|
modified = true;
|
|
}
|
|
while self.intent.secondary.len() > 1 {
|
|
// We have no particular preference for one secondary location over another: just
|
|
// arbitrarily drop from the end
|
|
self.intent.pop_secondary(scheduler);
|
|
modified = true;
|
|
}
|
|
}
|
|
Detached => {
|
|
// Never add locations in this mode
|
|
if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
|
|
self.intent.clear(scheduler);
|
|
modified = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if modified {
|
|
self.sequence.0 += 1;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
|
|
/// if the swap is not possible and leaves the intent state in its original state.
|
|
///
|
|
/// Arguments:
|
|
/// `attached_to`: the currently attached location matching the intent state (may be None if the
|
|
/// shard is not attached)
|
|
/// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
|
|
/// the scheduler to recommend a node
|
|
pub(crate) fn reschedule_to_secondary(
|
|
&mut self,
|
|
promote_to: Option<NodeId>,
|
|
scheduler: &mut Scheduler,
|
|
) -> Result<(), ScheduleError> {
|
|
let promote_to = match promote_to {
|
|
Some(node) => node,
|
|
None => match scheduler.node_preferred(self.intent.get_secondary()) {
|
|
Some(node) => node,
|
|
None => {
|
|
return Err(ScheduleError::ImpossibleConstraint);
|
|
}
|
|
},
|
|
};
|
|
|
|
assert!(self.intent.get_secondary().contains(&promote_to));
|
|
|
|
if let Some(node) = self.intent.get_attached() {
|
|
let demoted = self.intent.demote_attached(scheduler, *node);
|
|
if !demoted {
|
|
return Err(ScheduleError::ImpossibleConstraint);
|
|
}
|
|
}
|
|
|
|
self.intent.promote_attached(scheduler, promote_to);
|
|
|
|
// Increment the sequence number for the edge case where a
|
|
// reconciler is already running to avoid waiting on the
|
|
// current reconcile instead of spawning a new one.
|
|
self.sequence = self.sequence.next();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
|
/// its primary location based on soft constraints, switch that secondary location
|
|
/// to be attached.
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn optimize_attachment(
|
|
&self,
|
|
nodes: &HashMap<NodeId, Node>,
|
|
schedule_context: &ScheduleContext,
|
|
) -> Option<ScheduleOptimization> {
|
|
let attached = (*self.intent.get_attached())?;
|
|
if self.intent.secondary.is_empty() {
|
|
// We can only do useful work if we have both attached and secondary locations: this
|
|
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
|
return None;
|
|
}
|
|
|
|
let current_affinity_score = schedule_context.get_node_affinity(attached);
|
|
let current_attachment_count = schedule_context.get_node_attachments(attached);
|
|
|
|
// Generate score for each node, dropping any un-schedulable nodes.
|
|
let all_pageservers = self.intent.all_pageservers();
|
|
let mut scores = all_pageservers
|
|
.iter()
|
|
.flat_map(|node_id| {
|
|
let node = nodes.get(node_id);
|
|
if node.is_none() {
|
|
None
|
|
} else if matches!(
|
|
node.unwrap().get_scheduling(),
|
|
NodeSchedulingPolicy::Filling
|
|
) {
|
|
// If the node is currently filling, don't count it as a candidate to avoid,
|
|
// racing with the background fill.
|
|
None
|
|
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
|
|
None
|
|
} else {
|
|
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
|
let attachment_count = schedule_context.get_node_attachments(*node_id);
|
|
Some((*node_id, affinity_score, attachment_count))
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
// Sort precedence:
|
|
// 1st - prefer nodes with the lowest total affinity score
|
|
// 2nd - prefer nodes with the lowest number of attachments in this context
|
|
// 3rd - if all else is equal, sort by node ID for determinism in tests.
|
|
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
|
|
|
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
|
|
scores.first()
|
|
{
|
|
if attached != *preferred_node {
|
|
// The best alternative must be more than 1 better than us, otherwise we could end
|
|
// up flapping back next time we're called (e.g. there's no point migrating from
|
|
// a location with score 1 to a score zero, because on next location the situation
|
|
// would be the same, but in reverse).
|
|
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|
|
|| current_attachment_count > *preferred_attachment_count + 1
|
|
{
|
|
tracing::info!(
|
|
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
|
|
self.intent.get_secondary()
|
|
);
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: attached,
|
|
new_attached_node_id: *preferred_node,
|
|
}),
|
|
});
|
|
}
|
|
} else {
|
|
tracing::debug!(
|
|
"Node {} is already preferred (score {:?})",
|
|
preferred_node,
|
|
preferred_affinity_score
|
|
);
|
|
}
|
|
}
|
|
|
|
// Fall-through: we didn't find an optimization
|
|
None
|
|
}
|
|
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn optimize_secondary(
|
|
&self,
|
|
scheduler: &Scheduler,
|
|
schedule_context: &ScheduleContext,
|
|
) -> Option<ScheduleOptimization> {
|
|
if self.intent.secondary.is_empty() {
|
|
// We can only do useful work if we have both attached and secondary locations: this
|
|
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
|
return None;
|
|
}
|
|
|
|
for secondary in self.intent.get_secondary() {
|
|
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
|
|
// We're already on a node unaffected any affinity constraints,
|
|
// so we won't change it.
|
|
continue;
|
|
};
|
|
|
|
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
|
|
// This implicitly limits the choice to nodes that are available, and prefers nodes
|
|
// with lower utilization.
|
|
let Ok(candidate_node) =
|
|
scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
|
|
else {
|
|
// A scheduling error means we have no possible candidate replacements
|
|
continue;
|
|
};
|
|
|
|
let candidate_affinity_score = schedule_context
|
|
.nodes
|
|
.get(&candidate_node)
|
|
.unwrap_or(&AffinityScore::FREE);
|
|
|
|
// The best alternative must be more than 1 better than us, otherwise we could end
|
|
// up flapping back next time we're called.
|
|
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
|
|
// If some other node is available and has a lower score than this node, then
|
|
// that other node is a good place to migrate to.
|
|
tracing::info!(
|
|
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
|
|
self.intent.get_secondary()
|
|
);
|
|
return Some(ScheduleOptimization {
|
|
sequence: self.sequence,
|
|
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
|
old_node_id: *secondary,
|
|
new_node_id: candidate_node,
|
|
}),
|
|
});
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Return true if the optimization was really applied: it will not be applied if the optimization's
|
|
/// sequence is behind this tenant shard's
|
|
pub(crate) fn apply_optimization(
|
|
&mut self,
|
|
scheduler: &mut Scheduler,
|
|
optimization: ScheduleOptimization,
|
|
) -> bool {
|
|
if optimization.sequence != self.sequence {
|
|
return false;
|
|
}
|
|
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_schedule_optimization
|
|
.inc();
|
|
|
|
match optimization.action {
|
|
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id,
|
|
new_attached_node_id,
|
|
}) => {
|
|
self.intent.demote_attached(scheduler, old_attached_node_id);
|
|
self.intent
|
|
.promote_attached(scheduler, new_attached_node_id);
|
|
}
|
|
ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
|
old_node_id,
|
|
new_node_id,
|
|
}) => {
|
|
self.intent.remove_secondary(scheduler, old_node_id);
|
|
self.intent.push_secondary(scheduler, new_node_id);
|
|
}
|
|
}
|
|
|
|
true
|
|
}
|
|
|
|
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
|
|
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
|
|
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
|
|
///
|
|
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
|
|
/// funciton should not be used to decide whether to reconcile.
|
|
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
|
|
if let Some(attach_intent) = self.intent.attached {
|
|
match self.observed.locations.get(&attach_intent) {
|
|
Some(loc) => match &loc.conf {
|
|
Some(conf) => match conf.mode {
|
|
LocationConfigMode::AttachedMulti
|
|
| LocationConfigMode::AttachedSingle
|
|
| LocationConfigMode::AttachedStale => {
|
|
// Our intent and observed state agree that this node is in an attached state.
|
|
Some(attach_intent)
|
|
}
|
|
// Our observed config is not an attached state
|
|
_ => None,
|
|
},
|
|
// Our observed state is None, i.e. in flux
|
|
None => None,
|
|
},
|
|
// We have no observed state for this node
|
|
None => None,
|
|
}
|
|
} else {
|
|
// Our intent is not to attach
|
|
None
|
|
}
|
|
}
|
|
|
|
fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
|
|
let mut dirty_nodes = HashSet::new();
|
|
|
|
if let Some(node_id) = self.intent.attached {
|
|
// Maybe panic: it is a severe bug if we try to attach while generation is null.
|
|
let generation = self
|
|
.generation
|
|
.expect("Attempted to enter attached state without a generation");
|
|
|
|
let wanted_conf = attached_location_conf(
|
|
generation,
|
|
&self.shard,
|
|
&self.config,
|
|
!self.intent.secondary.is_empty(),
|
|
);
|
|
match self.observed.locations.get(&node_id) {
|
|
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
|
Some(_) | None => {
|
|
dirty_nodes.insert(node_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
for node_id in &self.intent.secondary {
|
|
let wanted_conf = secondary_location_conf(&self.shard, &self.config);
|
|
match self.observed.locations.get(node_id) {
|
|
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
|
Some(_) | None => {
|
|
dirty_nodes.insert(*node_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
for node_id in self.observed.locations.keys() {
|
|
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
|
|
// We have observed state that isn't part of our intent: need to clean it up.
|
|
dirty_nodes.insert(*node_id);
|
|
}
|
|
}
|
|
|
|
dirty_nodes.retain(|node_id| {
|
|
nodes
|
|
.get(node_id)
|
|
.map(|n| n.is_available())
|
|
.unwrap_or(false)
|
|
});
|
|
|
|
!dirty_nodes.is_empty()
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn get_reconcile_needed(
|
|
&mut self,
|
|
pageservers: &Arc<HashMap<NodeId, Node>>,
|
|
) -> ReconcileNeeded {
|
|
// If there are any ambiguous observed states, and the nodes they refer to are available,
|
|
// we should reconcile to clean them up.
|
|
let mut dirty_observed = false;
|
|
for (node_id, observed_loc) in &self.observed.locations {
|
|
let node = pageservers
|
|
.get(node_id)
|
|
.expect("Nodes may not be removed while referenced");
|
|
if observed_loc.conf.is_none() && node.is_available() {
|
|
dirty_observed = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
let active_nodes_dirty = self.dirty(pageservers);
|
|
|
|
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
|
// wake up a reconciler to send it.
|
|
let do_reconcile =
|
|
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
|
|
|
if !do_reconcile {
|
|
tracing::debug!("Not dirty, no reconciliation needed.");
|
|
return ReconcileNeeded::No;
|
|
}
|
|
|
|
// If we are currently splitting, then never start a reconciler task: the splitting logic
|
|
// requires that shards are not interfered with while it runs. Do this check here rather than
|
|
// up top, so that we only log this message if we would otherwise have done a reconciliation.
|
|
if !matches!(self.splitting, SplitState::Idle) {
|
|
tracing::info!("Refusing to reconcile, splitting in progress");
|
|
return ReconcileNeeded::No;
|
|
}
|
|
|
|
// Reconcile already in flight for the current sequence?
|
|
if let Some(handle) = &self.reconciler {
|
|
if handle.sequence == self.sequence {
|
|
tracing::info!(
|
|
"Reconciliation already in progress for sequence {:?}",
|
|
self.sequence,
|
|
);
|
|
return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Pre-checks done: finally check whether we may actually do the work
|
|
match self.scheduling_policy {
|
|
ShardSchedulingPolicy::Active
|
|
| ShardSchedulingPolicy::Essential
|
|
| ShardSchedulingPolicy::Pause => {}
|
|
ShardSchedulingPolicy::Stop => {
|
|
// We only reach this point if there is work to do and we're going to skip
|
|
// doing it: warn it obvious why this tenant isn't doing what it ought to.
|
|
tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
|
|
return ReconcileNeeded::No;
|
|
}
|
|
}
|
|
|
|
ReconcileNeeded::Yes
|
|
}
|
|
|
|
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
|
|
/// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
|
|
///
|
|
/// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
|
|
/// that the waiter will not complete until some future Reconciler is constructed and run.
|
|
fn ensure_sequence_ahead(&mut self) {
|
|
// Find the highest sequence for which a Reconciler has previously run or is currently
|
|
// running
|
|
let max_seen = std::cmp::max(
|
|
self.reconciler
|
|
.as_ref()
|
|
.map(|r| r.sequence)
|
|
.unwrap_or(Sequence(0)),
|
|
std::cmp::max(self.waiter.load(), self.error_waiter.load()),
|
|
);
|
|
|
|
if self.sequence <= max_seen {
|
|
self.sequence = max_seen.next();
|
|
}
|
|
}
|
|
|
|
/// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
|
|
///
|
|
/// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
|
|
/// you would like to wait on the next reconciler that gets spawned in the background.
|
|
pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
|
|
self.ensure_sequence_ahead();
|
|
|
|
ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
|
pub(crate) fn spawn_reconciler(
|
|
&mut self,
|
|
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
|
pageservers: &Arc<HashMap<NodeId, Node>>,
|
|
compute_hook: &Arc<ComputeHook>,
|
|
service_config: &service::Config,
|
|
persistence: &Arc<Persistence>,
|
|
units: ReconcileUnits,
|
|
gate_guard: GateGuard,
|
|
cancel: &CancellationToken,
|
|
) -> Option<ReconcilerWaiter> {
|
|
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
|
|
// doing our sequence's work.
|
|
let old_handle = self.reconciler.take();
|
|
|
|
// Build list of nodes from which the reconciler should detach
|
|
let mut detach = Vec::new();
|
|
for node_id in self.observed.locations.keys() {
|
|
if self.intent.get_attached() != &Some(*node_id)
|
|
&& !self.intent.secondary.contains(node_id)
|
|
{
|
|
detach.push(
|
|
pageservers
|
|
.get(node_id)
|
|
.expect("Intent references non-existent pageserver")
|
|
.clone(),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Advance the sequence before spawning a reconciler, so that sequence waiters
|
|
// can distinguish between before+after the reconcile completes.
|
|
self.ensure_sequence_ahead();
|
|
|
|
let reconciler_cancel = cancel.child_token();
|
|
let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
|
|
let mut reconciler = Reconciler {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
shard: self.shard,
|
|
generation: self.generation,
|
|
intent: reconciler_intent,
|
|
detach,
|
|
config: self.config.clone(),
|
|
observed: self.observed.clone(),
|
|
compute_hook: compute_hook.clone(),
|
|
service_config: service_config.clone(),
|
|
_gate_guard: gate_guard,
|
|
_resource_units: units,
|
|
cancel: reconciler_cancel.clone(),
|
|
persistence: persistence.clone(),
|
|
compute_notify_failure: false,
|
|
};
|
|
|
|
let reconcile_seq = self.sequence;
|
|
|
|
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
|
let must_notify = self.pending_compute_notification;
|
|
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
|
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
|
shard_id=%reconciler.tenant_shard_id.shard_slug());
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_spawn
|
|
.inc();
|
|
let result_tx = result_tx.clone();
|
|
let join_handle = tokio::task::spawn(
|
|
async move {
|
|
// Wait for any previous reconcile task to complete before we start
|
|
if let Some(old_handle) = old_handle {
|
|
old_handle.cancel.cancel();
|
|
if let Err(e) = old_handle.handle.await {
|
|
// We can't do much with this other than log it: the task is done, so
|
|
// we may proceed with our work.
|
|
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
|
|
}
|
|
}
|
|
|
|
// Early check for cancellation before doing any work
|
|
// TODO: wrap all remote API operations in cancellation check
|
|
// as well.
|
|
if reconciler.cancel.is_cancelled() {
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_complete
|
|
.inc(ReconcileCompleteLabelGroup {
|
|
status: ReconcileOutcome::Cancel,
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Attempt to make observed state match intent state
|
|
let result = reconciler.reconcile().await;
|
|
|
|
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
|
// of whether the above reconcile() did any work
|
|
if result.is_ok() && must_notify {
|
|
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
|
reconciler.compute_notify().await.ok();
|
|
}
|
|
|
|
// Update result counter
|
|
let outcome_label = match &result {
|
|
Ok(_) => ReconcileOutcome::Success,
|
|
Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
|
|
Err(_) => ReconcileOutcome::Error,
|
|
};
|
|
|
|
metrics::METRICS_REGISTRY
|
|
.metrics_group
|
|
.storage_controller_reconcile_complete
|
|
.inc(ReconcileCompleteLabelGroup {
|
|
status: outcome_label,
|
|
});
|
|
|
|
// Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
|
|
// try and schedule more work in response to our result.
|
|
let result = ReconcileResult {
|
|
sequence: reconcile_seq,
|
|
result,
|
|
tenant_shard_id: reconciler.tenant_shard_id,
|
|
generation: reconciler.generation,
|
|
observed: reconciler.observed,
|
|
pending_compute_notification: reconciler.compute_notify_failure,
|
|
};
|
|
|
|
result_tx.send(result).ok();
|
|
}
|
|
.instrument(reconciler_span),
|
|
);
|
|
|
|
self.reconciler = Some(ReconcilerHandle {
|
|
sequence: self.sequence,
|
|
handle: join_handle,
|
|
cancel: reconciler_cancel,
|
|
});
|
|
|
|
Some(ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
})
|
|
}
|
|
|
|
/// Get a waiter for any reconciliation in flight, but do not start reconciliation
|
|
/// if it is not already running
|
|
pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
|
|
if self.reconciler.is_some() {
|
|
Some(ReconcilerWaiter {
|
|
tenant_shard_id: self.tenant_shard_id,
|
|
seq_wait: self.waiter.clone(),
|
|
error_seq_wait: self.error_waiter.clone(),
|
|
error: self.last_error.clone(),
|
|
seq: self.sequence,
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Called when a ReconcileResult has been emitted and the service is updating
|
|
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
|
|
/// the handle to indicate there is no longer a reconciliation in progress.
|
|
pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
|
|
if let Some(reconcile_handle) = &self.reconciler {
|
|
if reconcile_handle.sequence <= sequence {
|
|
self.reconciler = None;
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we had any state at all referring to this node ID, drop it. Does not
|
|
// attempt to reschedule.
|
|
pub(crate) fn deref_node(&mut self, node_id: NodeId) {
|
|
if self.intent.attached == Some(node_id) {
|
|
self.intent.attached = None;
|
|
}
|
|
|
|
self.intent.secondary.retain(|n| n != &node_id);
|
|
|
|
self.observed.locations.remove(&node_id);
|
|
|
|
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
|
|
}
|
|
|
|
pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
|
|
self.scheduling_policy = p;
|
|
}
|
|
|
|
pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
|
|
&self.scheduling_policy
|
|
}
|
|
|
|
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
|
|
// Ordering: always set last_error before advancing sequence, so that sequence
|
|
// waiters are guaranteed to see a Some value when they see an error.
|
|
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
|
|
self.error_waiter.advance(sequence);
|
|
}
|
|
|
|
pub(crate) fn from_persistent(
|
|
tsp: TenantShardPersistence,
|
|
intent: IntentState,
|
|
) -> anyhow::Result<Self> {
|
|
let tenant_shard_id = tsp.get_tenant_shard_id()?;
|
|
let shard_identity = tsp.get_shard_identity()?;
|
|
|
|
Ok(Self {
|
|
tenant_shard_id,
|
|
shard: shard_identity,
|
|
sequence: Sequence::initial(),
|
|
generation: tsp.generation.map(|g| Generation::new(g as u32)),
|
|
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
|
|
intent,
|
|
observed: ObservedState::new(),
|
|
config: serde_json::from_str(&tsp.config).unwrap(),
|
|
reconciler: None,
|
|
splitting: tsp.splitting,
|
|
waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
|
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
|
last_error: Arc::default(),
|
|
pending_compute_notification: false,
|
|
delayed_reconcile: false,
|
|
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
|
|
})
|
|
}
|
|
|
|
pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
|
|
TenantShardPersistence {
|
|
tenant_id: self.tenant_shard_id.tenant_id.to_string(),
|
|
shard_number: self.tenant_shard_id.shard_number.0 as i32,
|
|
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
|
|
shard_stripe_size: self.shard.stripe_size.0 as i32,
|
|
generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
|
|
generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
|
|
placement_policy: serde_json::to_string(&self.policy).unwrap(),
|
|
config: serde_json::to_string(&self.config).unwrap(),
|
|
splitting: SplitState::default(),
|
|
scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub(crate) mod tests {
|
|
use pageserver_api::{
|
|
controller_api::NodeAvailability,
|
|
shard::{ShardCount, ShardNumber},
|
|
};
|
|
use utils::id::TenantId;
|
|
|
|
use crate::scheduler::test_utils::make_test_nodes;
|
|
|
|
use super::*;
|
|
|
|
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
|
|
let tenant_id = TenantId::generate();
|
|
let shard_number = ShardNumber(0);
|
|
let shard_count = ShardCount::new(1);
|
|
|
|
let tenant_shard_id = TenantShardId {
|
|
tenant_id,
|
|
shard_number,
|
|
shard_count,
|
|
};
|
|
TenantShard::new(
|
|
tenant_shard_id,
|
|
ShardIdentity::new(
|
|
shard_number,
|
|
shard_count,
|
|
pageserver_api::shard::ShardStripeSize(32768),
|
|
)
|
|
.unwrap(),
|
|
policy,
|
|
)
|
|
}
|
|
|
|
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
|
|
let tenant_id = TenantId::generate();
|
|
|
|
(0..shard_count.count())
|
|
.map(|i| {
|
|
let shard_number = ShardNumber(i);
|
|
|
|
let tenant_shard_id = TenantShardId {
|
|
tenant_id,
|
|
shard_number,
|
|
shard_count,
|
|
};
|
|
TenantShard::new(
|
|
tenant_shard_id,
|
|
ShardIdentity::new(
|
|
shard_number,
|
|
shard_count,
|
|
pageserver_api::shard::ShardStripeSize(32768),
|
|
)
|
|
.unwrap(),
|
|
policy.clone(),
|
|
)
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Test the scheduling behaviors used when a tenant configured for HA is subject
|
|
/// to nodes being marked offline.
|
|
#[test]
|
|
fn tenant_ha_scheduling() -> anyhow::Result<()> {
|
|
// Start with three nodes. Our tenant will only use two. The third one is
|
|
// expected to remain unused.
|
|
let mut nodes = make_test_nodes(3);
|
|
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
let mut context = ScheduleContext::default();
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("we have enough nodes, scheduling should work");
|
|
|
|
// Expect to initially be schedule on to different nodes
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 1);
|
|
assert!(tenant_shard.intent.attached.is_some());
|
|
|
|
let attached_node_id = tenant_shard.intent.attached.unwrap();
|
|
let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
|
|
assert_ne!(attached_node_id, secondary_node_id);
|
|
|
|
// Notifying the attached node is offline should demote it to a secondary
|
|
let changed = tenant_shard
|
|
.intent
|
|
.demote_attached(&mut scheduler, attached_node_id);
|
|
assert!(changed);
|
|
assert!(tenant_shard.intent.attached.is_none());
|
|
assert_eq!(tenant_shard.intent.secondary.len(), 2);
|
|
|
|
// Update the scheduler state to indicate the node is offline
|
|
nodes
|
|
.get_mut(&attached_node_id)
|
|
.unwrap()
|
|
.set_availability(NodeAvailability::Offline);
|
|
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
|
|
|
|
// Scheduling the node should promote the still-available secondary node to attached
|
|
tenant_shard
|
|
.schedule(&mut scheduler, &mut context)
|
|
.expect("active nodes are available");
|
|
assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
|
|
|
|
// The original attached node should have been retained as a secondary
|
|
assert_eq!(
|
|
*tenant_shard.intent.secondary.iter().last().unwrap(),
|
|
attached_node_id
|
|
);
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn intent_from_observed() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(3);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
tenant_shard.observed.locations.insert(
|
|
NodeId(3),
|
|
ObservedStateLocation {
|
|
conf: Some(LocationConfig {
|
|
mode: LocationConfigMode::AttachedMulti,
|
|
generation: Some(2),
|
|
secondary_conf: None,
|
|
shard_number: tenant_shard.shard.number.0,
|
|
shard_count: tenant_shard.shard.count.literal(),
|
|
shard_stripe_size: tenant_shard.shard.stripe_size.0,
|
|
tenant_conf: TenantConfig::default(),
|
|
}),
|
|
},
|
|
);
|
|
|
|
tenant_shard.observed.locations.insert(
|
|
NodeId(2),
|
|
ObservedStateLocation {
|
|
conf: Some(LocationConfig {
|
|
mode: LocationConfigMode::AttachedStale,
|
|
generation: Some(1),
|
|
secondary_conf: None,
|
|
shard_number: tenant_shard.shard.number.0,
|
|
shard_count: tenant_shard.shard.count.literal(),
|
|
shard_stripe_size: tenant_shard.shard.stripe_size.0,
|
|
tenant_conf: TenantConfig::default(),
|
|
}),
|
|
},
|
|
);
|
|
|
|
tenant_shard.intent_from_observed(&mut scheduler);
|
|
|
|
// The highest generationed attached location gets used as attached
|
|
assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
|
|
// Other locations get used as secondary
|
|
assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
|
|
|
|
scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn scheduling_mode() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(3);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
// In pause mode, schedule() shouldn't do anything
|
|
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
|
|
assert!(tenant_shard
|
|
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
|
.is_ok());
|
|
assert!(tenant_shard.intent.all_pageservers().is_empty());
|
|
|
|
// In active mode, schedule() works
|
|
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
|
|
assert!(tenant_shard
|
|
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
|
.is_ok());
|
|
assert!(!tenant_shard.intent.all_pageservers().is_empty());
|
|
|
|
tenant_shard.intent.clear(&mut scheduler);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn optimize_attachment() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(3);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
// Initially: both nodes attached on shard 1, and both have secondary locations
|
|
// on different nodes.
|
|
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
|
|
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
|
|
let mut schedule_context = ScheduleContext::default();
|
|
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
|
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
|
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
|
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
|
|
|
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
|
|
|
|
// Either shard should recognize that it has the option to switch to a secondary location where there
|
|
// would be no other shards from the same tenant, and request to do so.
|
|
assert_eq!(
|
|
optimization_a,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: NodeId(1),
|
|
new_attached_node_id: NodeId(2)
|
|
})
|
|
})
|
|
);
|
|
|
|
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
|
|
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
|
|
// of [`Service::optimize_all`] to avoid trying
|
|
// to do optimizations for multiple shards in the same tenant at the same time. Generating
|
|
// both optimizations is just done for test purposes
|
|
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
|
|
assert_eq!(
|
|
optimization_b,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_b.sequence,
|
|
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
|
|
old_attached_node_id: NodeId(1),
|
|
new_attached_node_id: NodeId(3)
|
|
})
|
|
})
|
|
);
|
|
|
|
// Applying these optimizations should result in the end state proposed
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
|
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
|
|
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
|
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
|
|
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
|
|
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
shard_b.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn optimize_secondary() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(4);
|
|
let mut scheduler = Scheduler::new(nodes.values());
|
|
|
|
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
|
|
|
// Initially: both nodes attached on shard 1, and both have secondary locations
|
|
// on different nodes.
|
|
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
|
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
|
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
|
|
|
let mut schedule_context = ScheduleContext::default();
|
|
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
|
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
|
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
|
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
|
|
|
let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
|
|
|
|
// Since there is a node with no locations available, the node with two locations for the
|
|
// same tenant should generate an optimization to move one away
|
|
assert_eq!(
|
|
optimization_a,
|
|
Some(ScheduleOptimization {
|
|
sequence: shard_a.sequence,
|
|
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
|
|
old_node_id: NodeId(3),
|
|
new_node_id: NodeId(4)
|
|
})
|
|
})
|
|
);
|
|
|
|
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
|
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
|
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
|
|
|
|
shard_a.intent.clear(&mut scheduler);
|
|
shard_b.intent.clear(&mut scheduler);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
|
// called repeatedly in the background.
|
|
fn optimize_til_idle(
|
|
nodes: &HashMap<NodeId, Node>,
|
|
scheduler: &mut Scheduler,
|
|
shards: &mut [TenantShard],
|
|
) {
|
|
let mut loop_n = 0;
|
|
loop {
|
|
let mut schedule_context = ScheduleContext::default();
|
|
let mut any_changed = false;
|
|
|
|
for shard in shards.iter() {
|
|
schedule_context.avoid(&shard.intent.all_pageservers());
|
|
if let Some(attached) = shard.intent.get_attached() {
|
|
schedule_context.push_attached(*attached);
|
|
}
|
|
}
|
|
|
|
for shard in shards.iter_mut() {
|
|
let optimization = shard.optimize_attachment(nodes, &schedule_context);
|
|
if let Some(optimization) = optimization {
|
|
shard.apply_optimization(scheduler, optimization);
|
|
any_changed = true;
|
|
break;
|
|
}
|
|
|
|
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
|
|
if let Some(optimization) = optimization {
|
|
shard.apply_optimization(scheduler, optimization);
|
|
any_changed = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if !any_changed {
|
|
break;
|
|
}
|
|
|
|
// Assert no infinite loop
|
|
loop_n += 1;
|
|
assert!(loop_n < 1000);
|
|
}
|
|
}
|
|
|
|
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
|
|
/// that it converges.
|
|
#[test]
|
|
fn optimize_add_nodes() -> anyhow::Result<()> {
|
|
let nodes = make_test_nodes(4);
|
|
|
|
// Only show the scheduler a couple of nodes
|
|
let mut scheduler = Scheduler::new([].iter());
|
|
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
|
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
|
|
|
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
|
let mut schedule_context = ScheduleContext::default();
|
|
for shard in &mut shards {
|
|
assert!(shard
|
|
.schedule(&mut scheduler, &mut schedule_context)
|
|
.is_ok());
|
|
}
|
|
|
|
// We should see equal number of locations on the two nodes.
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
|
|
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);
|
|
|
|
// Add another two nodes: we should see the shards spread out when their optimize
|
|
// methods are called
|
|
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
|
|
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
|
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
|
|
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);
|
|
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
|
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 1);
|
|
|
|
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
|
|
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 1);
|
|
|
|
for shard in shards.iter_mut() {
|
|
shard.intent.clear(&mut scheduler);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|