From cc599e23c1d06c8311f8257ef9f7745a807dc3f1 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 9 Oct 2024 11:53:29 +0100 Subject: [PATCH] storcon: make observed state updates more granular (#9276) ## Problem Previously, observed state updates from the reconciler may have clobbered inline changes made to the observed state by other code paths. ## Summary of changes Model observed state changes from reconcilers as deltas. This means that we only update what has changed. Handling for node going off-line concurrently during the reconcile is also added: set observed state to None in such cases to respect the convention. Closes https://github.com/neondatabase/neon/issues/9124 --- storage_controller/src/reconciler.rs | 42 +++++++++++++- storage_controller/src/service.rs | 44 ++++++++------- storage_controller/src/tenant_shard.rs | 77 +++++++++++++++++++++++++- 3 files changed, 141 insertions(+), 22 deletions(-) diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 4864a021fe..9d2182d44c 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -22,7 +22,7 @@ use utils::sync::gate::GateGuard; use crate::compute_hook::{ComputeHook, NotifyError}; use crate::node::Node; -use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation}; +use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation}; const DEFAULT_HEATMAP_PERIOD: &str = "60s"; @@ -45,8 +45,15 @@ pub(super) struct Reconciler { pub(crate) reconciler_config: ReconcilerConfig, pub(crate) config: TenantConfig, + + /// Observed state from the point of view of the reconciler. + /// This gets updated as the reconciliation makes progress. pub(crate) observed: ObservedState, + /// Snapshot of the observed state at the point when the reconciler + /// was spawned. + pub(crate) original_observed: ObservedState, + pub(crate) service_config: service::Config, /// A hook to notify the running postgres instances when we change the location @@ -846,6 +853,39 @@ impl Reconciler { } } + /// Compare the observed state snapshot from when the reconcile was created + /// with the final observed state in order to generate observed state deltas. + pub(crate) fn observed_deltas(&self) -> Vec { + let mut deltas = Vec::default(); + + for (node_id, location) in &self.observed.locations { + let previous_location = self.original_observed.locations.get(node_id); + let do_upsert = match previous_location { + // Location config changed for node + Some(prev) if location.conf != prev.conf => true, + // New location config for node + None => true, + // Location config has not changed for node + _ => false, + }; + + if do_upsert { + deltas.push(ObservedStateDelta::Upsert(Box::new(( + *node_id, + location.clone(), + )))); + } + } + + for node_id in self.original_observed.locations.keys() { + if !self.observed.locations.contains_key(node_id) { + deltas.push(ObservedStateDelta::Delete(*node_id)); + } + } + + deltas + } + /// Keep trying to notify the compute indefinitely, only dropping out if: /// - the node `origin` becomes unavailable -> Ok(()) /// - the node `origin` no longer has our tenant shard attached -> Ok(()) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index c349e2b9bf..cc735dc27e 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -28,8 +28,8 @@ use crate::{ reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ - MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, - ScheduleOptimizationAction, + MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus, + ScheduleOptimization, ScheduleOptimizationAction, }, }; use anyhow::Context; @@ -1072,7 +1072,7 @@ impl Service { tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(), sequence=%result.sequence ))] - fn process_result(&self, mut result: ReconcileResult) { + fn process_result(&self, result: ReconcileResult) { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, _scheduler) = locked.parts_mut(); let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else { @@ -1094,22 +1094,27 @@ impl Service { // In case a node was deleted while this reconcile is in flight, filter it out of the update we will // make to the tenant - result - .observed - .locations - .retain(|node_id, _loc| nodes.contains_key(node_id)); + let deltas = result.observed_deltas.into_iter().flat_map(|delta| { + // In case a node was deleted while this reconcile is in flight, filter it out of the update we will + // make to the tenant + let node = nodes.get(delta.node_id())?; + + if node.is_available() { + return Some(delta); + } + + // In case a node became unavailable concurrently with the reconcile, observed + // locations on it are now uncertain. By convention, set them to None in order + // for them to get refreshed when the node comes back online. + Some(ObservedStateDelta::Upsert(Box::new(( + node.get_id(), + ObservedStateLocation { conf: None }, + )))) + }); match result.result { Ok(()) => { - for (node_id, loc) in &result.observed.locations { - if let Some(conf) = &loc.conf { - tracing::info!("Updating observed location {}: {:?}", node_id, conf); - } else { - tracing::info!("Setting observed location {} to None", node_id,) - } - } - - tenant.observed = result.observed; + tenant.apply_observed_deltas(deltas); tenant.waiter.advance(result.sequence); } Err(e) => { @@ -1131,9 +1136,10 @@ impl Service { // so that waiters will see the correct error after waiting. tenant.set_last_error(result.sequence, e); - for (node_id, o) in result.observed.locations { - tenant.observed.locations.insert(node_id, o); - } + // Skip deletions on reconcile failures + let upsert_deltas = + deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_))); + tenant.apply_observed_deltas(upsert_deltas); } } diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 2e85580e08..8a7ff866e6 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -425,6 +425,22 @@ pub(crate) enum ReconcileNeeded { Yes, } +/// Pending modification to the observed state of a tenant shard. +/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`]. +pub(crate) enum ObservedStateDelta { + Upsert(Box<(NodeId, ObservedStateLocation)>), + Delete(NodeId), +} + +impl ObservedStateDelta { + pub(crate) fn node_id(&self) -> &NodeId { + match self { + Self::Upsert(up) => &up.0, + Self::Delete(nid) => nid, + } + } +} + /// When a reconcile task completes, it sends this result object /// to be applied to the primary TenantShard. pub(crate) struct ReconcileResult { @@ -437,7 +453,7 @@ pub(crate) struct ReconcileResult { pub(crate) tenant_shard_id: TenantShardId, pub(crate) generation: Option, - pub(crate) observed: ObservedState, + pub(crate) observed_deltas: Vec, /// Set [`TenantShard::pending_compute_notification`] from this flag pub(crate) pending_compute_notification: bool, @@ -1123,7 +1139,7 @@ impl TenantShard { result, tenant_shard_id: reconciler.tenant_shard_id, generation: reconciler.generation, - observed: reconciler.observed, + observed_deltas: reconciler.observed_deltas(), pending_compute_notification: reconciler.compute_notify_failure, } } @@ -1177,6 +1193,7 @@ impl TenantShard { reconciler_config, config: self.config.clone(), observed: self.observed.clone(), + original_observed: self.observed.clone(), compute_hook: compute_hook.clone(), service_config: service_config.clone(), _gate_guard: gate_guard, @@ -1437,6 +1454,62 @@ impl TenantShard { .map(|(node_id, gen)| (node_id, Generation::new(gen))) .collect() } + + /// Update the observed state of the tenant by applying incremental deltas + /// + /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`]. + /// They are then filtered in [`crate::service::Service::process_result`]. + pub(crate) fn apply_observed_deltas( + &mut self, + deltas: impl Iterator, + ) { + for delta in deltas { + match delta { + ObservedStateDelta::Upsert(ups) => { + let (node_id, loc) = *ups; + + // If the generation of the observed location in the delta is lagging + // behind the current one, then we have a race condition and cannot + // be certain about the true observed state. Set the observed state + // to None in order to reflect this. + let crnt_gen = self + .observed + .locations + .get(&node_id) + .and_then(|loc| loc.conf.as_ref()) + .and_then(|conf| conf.generation); + let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation); + match (crnt_gen, new_gen) { + (Some(crnt), Some(new)) if crnt_gen > new_gen => { + tracing::warn!( + "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})", + node_id, loc, crnt, new + ); + + self.observed + .locations + .insert(node_id, ObservedStateLocation { conf: None }); + + continue; + } + _ => {} + } + + if let Some(conf) = &loc.conf { + tracing::info!("Updating observed location {}: {:?}", node_id, conf); + } else { + tracing::info!("Setting observed location {} to None", node_id,) + } + + self.observed.locations.insert(node_id, loc); + } + ObservedStateDelta::Delete(node_id) => { + tracing::info!("Deleting observed location {}", node_id); + self.observed.locations.remove(&node_id); + } + } + } + } } #[cfg(test)]