mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
storage controller: use proper ScheduleContext when evacuating a node (#9908)
## Problem
When picking locations for a shard, we should use a ScheduleContext that
includes all the other shards in the tenant, so that we apply proper
anti-affinity between shards. If we don't do this, then it can lead to
unstable scheduling, where we place a shard somewhere that the optimizer
will then immediately move it away from.
We didn't always do this, because it was a bit awkward to accumulate the
context for a tenant rather than just walking tenants.
This was a TODO in `handle_node_availability_transition`:
```
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
// for tenants without secondary locations: if they have a secondary location, then this
// schedule() call is just promoting an existing secondary)
```
This is a precursor to https://github.com/neondatabase/neon/issues/8264,
where the current imperfect scheduling during node evacuation hampers
testing.
## Summary of changes
- Add an iterator type that yields each shard along with a
schedulecontext that includes all the other shards from the same tenant
- Use the iterator to replace hand-crafted logic in optimize_all_plan
(functionally identical)
- Use the iterator in `handle_node_availability_transition` to apply
proper anti-affinity during node evacuation.
This commit is contained in:
@@ -305,7 +305,7 @@ impl std::ops::Add for AffinityScore {
|
||||
|
||||
/// Hint for whether this is a sincere attempt to schedule, or a speculative
|
||||
/// check for where we _would_ schedule (done during optimization)
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum ScheduleMode {
|
||||
Normal,
|
||||
Speculative,
|
||||
@@ -319,7 +319,7 @@ impl Default for ScheduleMode {
|
||||
|
||||
// For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
|
||||
// it for many shards in the same tenant.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(crate) struct ScheduleContext {
|
||||
/// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
|
||||
pub(crate) nodes: HashMap<NodeId, AffinityScore>,
|
||||
@@ -331,6 +331,14 @@ pub(crate) struct ScheduleContext {
|
||||
}
|
||||
|
||||
impl ScheduleContext {
|
||||
pub(crate) fn new(mode: ScheduleMode) -> Self {
|
||||
Self {
|
||||
nodes: HashMap::new(),
|
||||
attached_nodes: HashMap::new(),
|
||||
mode,
|
||||
}
|
||||
}
|
||||
|
||||
/// Input is a list of nodes we would like to avoid using again within this context. The more
|
||||
/// times a node is passed into this call, the less inclined we are to use it.
|
||||
pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
|
||||
@@ -355,6 +363,11 @@ impl ScheduleContext {
|
||||
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
|
||||
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn attach_count(&self) -> usize {
|
||||
self.attached_nodes.values().sum()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum RefCountUpdate {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
|
||||
use hyper::Uri;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
@@ -95,7 +98,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
pub mod chaos_injector;
|
||||
use context_iterator::TenantShardContextIterator;
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
@@ -5498,49 +5501,51 @@ impl Service {
|
||||
|
||||
let mut tenants_affected: usize = 0;
|
||||
|
||||
for (tenant_shard_id, tenant_shard) in tenants {
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
|
||||
// not assume our knowledge of the node's configuration is accurate until it comes back online
|
||||
observed_loc.conf = None;
|
||||
}
|
||||
for (_tenant_id, mut schedule_context, shards) in
|
||||
TenantShardContextIterator::new(tenants, ScheduleMode::Normal)
|
||||
{
|
||||
for tenant_shard in shards {
|
||||
let tenant_shard_id = tenant_shard.tenant_shard_id;
|
||||
if let Some(observed_loc) =
|
||||
tenant_shard.observed.locations.get_mut(&node_id)
|
||||
{
|
||||
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
|
||||
// not assume our knowledge of the node's configuration is accurate until it comes back online
|
||||
observed_loc.conf = None;
|
||||
}
|
||||
|
||||
if nodes.len() == 1 {
|
||||
// Special case for single-node cluster: there is no point trying to reschedule
|
||||
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
|
||||
// failures to schedule them.
|
||||
continue;
|
||||
}
|
||||
if nodes.len() == 1 {
|
||||
// Special case for single-node cluster: there is no point trying to reschedule
|
||||
// any tenant shards: avoid doing so, in order to avoid spewing warnings about
|
||||
// failures to schedule them.
|
||||
continue;
|
||||
}
|
||||
|
||||
if !nodes
|
||||
.values()
|
||||
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
{
|
||||
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
|
||||
// trying to reschedule since there's nowhere else to go. Without this
|
||||
// branch we incorrectly detach tenants in response to node unavailability.
|
||||
continue;
|
||||
}
|
||||
if !nodes
|
||||
.values()
|
||||
.any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
{
|
||||
// Special case for when all nodes are unavailable and/or unschedulable: there is no point
|
||||
// trying to reschedule since there's nowhere else to go. Without this
|
||||
// branch we incorrectly detach tenants in response to node unavailability.
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
|
||||
// for tenants without secondary locations: if they have a secondary location, then this
|
||||
// schedule() call is just promoting an existing secondary)
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
// It is possible that some tenants will become unschedulable when too many pageservers
|
||||
// go offline: in this case there isn't much we can do other than make the issue observable.
|
||||
// TODO: give TenantShard a scheduling error attribute to be queried later.
|
||||
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
|
||||
tenants_affected += 1;
|
||||
};
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
// It is possible that some tenants will become unschedulable when too many pageservers
|
||||
// go offline: in this case there isn't much we can do other than make the issue observable.
|
||||
// TODO: give TenantShard a scheduling error attribute to be queried later.
|
||||
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
|
||||
tenants_affected += 1;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6011,14 +6016,8 @@ impl Service {
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let pageservers = nodes.clone();
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut reconciles_spawned = 0;
|
||||
for (tenant_shard_id, shard) in tenants.iter_mut() {
|
||||
if tenant_shard_id.is_shard_zero() {
|
||||
schedule_context = ScheduleContext::default();
|
||||
}
|
||||
|
||||
for shard in tenants.values_mut() {
|
||||
// Skip checking if this shard is already enqueued for reconciliation
|
||||
if shard.delayed_reconcile && self.reconciler_concurrency.available_permits() == 0 {
|
||||
// If there is something delayed, then return a nonzero count so that
|
||||
@@ -6033,8 +6032,6 @@ impl Service {
|
||||
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
|
||||
reconciles_spawned += 1;
|
||||
}
|
||||
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
}
|
||||
|
||||
reconciles_spawned
|
||||
@@ -6103,95 +6100,62 @@ impl Service {
|
||||
}
|
||||
|
||||
fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
|
||||
|
||||
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
|
||||
// this higher than the execution limit gives us a chance to execute some work even if the first
|
||||
// few optimizations we find are not ready.
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
|
||||
|
||||
let mut work = Vec::new();
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
for (tenant_shard_id, shard) in tenants.iter() {
|
||||
if tenant_shard_id.is_shard_zero() {
|
||||
// Reset accumulators on the first shard in a tenant
|
||||
schedule_context = ScheduleContext::default();
|
||||
schedule_context.mode = ScheduleMode::Speculative;
|
||||
tenant_shards.clear();
|
||||
}
|
||||
|
||||
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
|
||||
break;
|
||||
}
|
||||
|
||||
match shard.get_scheduling_policy() {
|
||||
ShardSchedulingPolicy::Active => {
|
||||
// Ok to do optimization
|
||||
for (_tenant_id, schedule_context, shards) in
|
||||
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative)
|
||||
{
|
||||
for shard in shards {
|
||||
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
|
||||
break;
|
||||
}
|
||||
ShardSchedulingPolicy::Essential
|
||||
| ShardSchedulingPolicy::Pause
|
||||
| ShardSchedulingPolicy::Stop => {
|
||||
// Policy prevents optimizing this shard.
|
||||
continue;
|
||||
match shard.get_scheduling_policy() {
|
||||
ShardSchedulingPolicy::Active => {
|
||||
// Ok to do optimization
|
||||
}
|
||||
ShardSchedulingPolicy::Essential
|
||||
| ShardSchedulingPolicy::Pause
|
||||
| ShardSchedulingPolicy::Stop => {
|
||||
// Policy prevents optimizing this shard.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Accumulate the schedule context for all the shards in a tenant: we must have
|
||||
// the total view of all shards before we can try to optimize any of them.
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
if let Some(attached) = shard.intent.get_attached() {
|
||||
schedule_context.push_attached(*attached);
|
||||
}
|
||||
tenant_shards.push(shard);
|
||||
|
||||
// Once we have seen the last shard in the tenant, proceed to search across all shards
|
||||
// in the tenant for optimizations
|
||||
if shard.shard.number.0 == shard.shard.count.count() - 1 {
|
||||
if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
|
||||
if !matches!(shard.splitting, SplitState::Idle)
|
||||
|| matches!(shard.policy, PlacementPolicy::Detached)
|
||||
|| shard.reconciler.is_some()
|
||||
{
|
||||
// Do not start any optimizations while another change to the tenant is ongoing: this
|
||||
// is not necessary for correctness, but simplifies operations and implicitly throttles
|
||||
// optimization changes to happen in a "trickle" over time.
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shards.iter().any(|s| {
|
||||
!matches!(s.splitting, SplitState::Idle)
|
||||
|| matches!(s.policy, PlacementPolicy::Detached)
|
||||
}) {
|
||||
// Never attempt to optimize a tenant that is currently being split, or
|
||||
// a tenant that is meant to be detached
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: optimization calculations are relatively expensive: create some fast-path for
|
||||
// the common idle case (avoiding the search on tenants that we have recently checked)
|
||||
|
||||
for shard in &tenant_shards {
|
||||
if let Some(optimization) =
|
||||
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
|
||||
// its primary location based on soft constraints, cut it over.
|
||||
shard.optimize_attachment(nodes, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
} else if let Some(optimization) =
|
||||
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
|
||||
// better placed on another node, based on ScheduleContext, then adjust it. This
|
||||
// covers cases like after a shard split, where we might have too many shards
|
||||
// in the same tenant with secondary locations on the node where they originally split.
|
||||
shard.optimize_secondary(scheduler, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: extend this mechanism to prefer attaching on nodes with fewer attached
|
||||
// tenants (i.e. extend schedule state to distinguish attached from secondary counts),
|
||||
// for the total number of attachments on a node (not just within a tenant.)
|
||||
if let Some(optimization) =
|
||||
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
|
||||
// its primary location based on soft constraints, cut it over.
|
||||
shard.optimize_attachment(nodes, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
} else if let Some(optimization) =
|
||||
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
|
||||
// better placed on another node, based on ScheduleContext, then adjust it. This
|
||||
// covers cases like after a shard split, where we might have too many shards
|
||||
// in the same tenant with secondary locations on the node where they originally split.
|
||||
shard.optimize_secondary(scheduler, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
139
storage_controller/src/service/context_iterator.rs
Normal file
139
storage_controller/src/service/context_iterator.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::scheduler::{ScheduleContext, ScheduleMode};
|
||||
use crate::tenant_shard::TenantShard;
|
||||
|
||||
/// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
|
||||
/// tenant while considering the individual shards within it. This iterator is a helper
|
||||
/// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
|
||||
/// for the tenant.
|
||||
pub(super) struct TenantShardContextIterator<'a> {
|
||||
schedule_mode: ScheduleMode,
|
||||
inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
|
||||
}
|
||||
|
||||
impl<'a> TenantShardContextIterator<'a> {
|
||||
pub(super) fn new(
|
||||
tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
|
||||
schedule_mode: ScheduleMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
schedule_mode,
|
||||
inner: tenants.iter_mut(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for TenantShardContextIterator<'a> {
|
||||
type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let mut tenant_shards = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone());
|
||||
loop {
|
||||
let (tenant_shard_id, shard) = self.inner.next()?;
|
||||
|
||||
if tenant_shard_id.is_shard_zero() {
|
||||
// Cleared on last shard of previous tenant
|
||||
assert!(tenant_shards.is_empty());
|
||||
}
|
||||
|
||||
// Accumulate the schedule context for all the shards in a tenant
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
if let Some(attached) = shard.intent.get_attached() {
|
||||
schedule_context.push_attached(*attached);
|
||||
}
|
||||
tenant_shards.push(shard);
|
||||
|
||||
if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 {
|
||||
return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::BTreeMap, str::FromStr};
|
||||
|
||||
use pageserver_api::controller_api::PlacementPolicy;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::{
|
||||
scheduler::test_utils::make_test_nodes, service::Scheduler,
|
||||
tenant_shard::tests::make_test_tenant_with_id,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_context_iterator() {
|
||||
// Hand-crafted tenant IDs to ensure they appear in the expected order when put into
|
||||
// a btreemap & iterated
|
||||
let mut t_1_shards = make_test_tenant_with_id(
|
||||
TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(),
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount(1),
|
||||
None,
|
||||
);
|
||||
let t_2_shards = make_test_tenant_with_id(
|
||||
TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(),
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount(4),
|
||||
None,
|
||||
);
|
||||
let mut t_3_shards = make_test_tenant_with_id(
|
||||
TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(),
|
||||
PlacementPolicy::Attached(1),
|
||||
ShardCount(1),
|
||||
None,
|
||||
);
|
||||
|
||||
let t1_id = t_1_shards[0].tenant_shard_id.tenant_id;
|
||||
let t2_id = t_2_shards[0].tenant_shard_id.tenant_id;
|
||||
let t3_id = t_3_shards[0].tenant_shard_id.tenant_id;
|
||||
|
||||
let mut tenants = BTreeMap::new();
|
||||
tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap());
|
||||
for shard in t_2_shards {
|
||||
tenants.insert(shard.tenant_shard_id, shard);
|
||||
}
|
||||
tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap());
|
||||
|
||||
let nodes = make_test_nodes(3, &[]);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut context = ScheduleContext::default();
|
||||
for shard in tenants.values_mut() {
|
||||
shard.schedule(&mut scheduler, &mut context).unwrap();
|
||||
}
|
||||
|
||||
let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative);
|
||||
let (tenant_id, context, shards) = iter.next().unwrap();
|
||||
assert_eq!(tenant_id, t1_id);
|
||||
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
|
||||
assert_eq!(shards.len(), 1);
|
||||
assert_eq!(context.attach_count(), 1);
|
||||
|
||||
let (tenant_id, context, shards) = iter.next().unwrap();
|
||||
assert_eq!(tenant_id, t2_id);
|
||||
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
|
||||
assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1));
|
||||
assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2));
|
||||
assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3));
|
||||
assert_eq!(shards.len(), 4);
|
||||
assert_eq!(context.attach_count(), 4);
|
||||
|
||||
let (tenant_id, context, shards) = iter.next().unwrap();
|
||||
assert_eq!(tenant_id, t3_id);
|
||||
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
|
||||
assert_eq!(shards.len(), 1);
|
||||
assert_eq!(context.attach_count(), 1);
|
||||
|
||||
for shard in tenants.values_mut() {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1574,13 +1574,20 @@ pub(crate) mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn make_test_tenant(
|
||||
pub(crate) fn make_test_tenant(
|
||||
policy: PlacementPolicy,
|
||||
shard_count: ShardCount,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
) -> Vec<TenantShard> {
|
||||
let tenant_id = TenantId::generate();
|
||||
make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
|
||||
}
|
||||
|
||||
pub(crate) fn make_test_tenant_with_id(
|
||||
tenant_id: TenantId,
|
||||
policy: PlacementPolicy,
|
||||
shard_count: ShardCount,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
) -> Vec<TenantShard> {
|
||||
(0..shard_count.count())
|
||||
.map(|i| {
|
||||
let shard_number = ShardNumber(i);
|
||||
|
||||
Reference in New Issue
Block a user