storcon: make observed state updates more granular (#9276)

## Problem

Previously, observed state updates from the reconciler may have
clobbered inline changes made to the observed state by other code paths.

## Summary of changes

Model observed state changes from reconcilers as deltas. This means that
we only update what has changed. Handling for node going off-line concurrently
during the reconcile is also added: set observed state to None in such cases to
respect the convention.

Closes https://github.com/neondatabase/neon/issues/9124
This commit is contained in:
Vlad Lazar
2024-10-09 11:53:29 +01:00
committed by GitHub
parent 54d1185789
commit cc599e23c1
3 changed files with 141 additions and 22 deletions

View File

@@ -22,7 +22,7 @@ use utils::sync::gate::GateGuard;
use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
@@ -45,8 +45,15 @@ pub(super) struct Reconciler {
pub(crate) reconciler_config: ReconcilerConfig,
pub(crate) config: TenantConfig,
/// Observed state from the point of view of the reconciler.
/// This gets updated as the reconciliation makes progress.
pub(crate) observed: ObservedState,
/// Snapshot of the observed state at the point when the reconciler
/// was spawned.
pub(crate) original_observed: ObservedState,
pub(crate) service_config: service::Config,
/// A hook to notify the running postgres instances when we change the location
@@ -846,6 +853,39 @@ impl Reconciler {
}
}
/// Compare the observed state snapshot from when the reconcile was created
/// with the final observed state in order to generate observed state deltas.
pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
let mut deltas = Vec::default();
for (node_id, location) in &self.observed.locations {
let previous_location = self.original_observed.locations.get(node_id);
let do_upsert = match previous_location {
// Location config changed for node
Some(prev) if location.conf != prev.conf => true,
// New location config for node
None => true,
// Location config has not changed for node
_ => false,
};
if do_upsert {
deltas.push(ObservedStateDelta::Upsert(Box::new((
*node_id,
location.clone(),
))));
}
}
for node_id in self.original_observed.locations.keys() {
if !self.observed.locations.contains_key(node_id) {
deltas.push(ObservedStateDelta::Delete(*node_id));
}
}
deltas
}
/// Keep trying to notify the compute indefinitely, only dropping out if:
/// - the node `origin` becomes unavailable -> Ok(())
/// - the node `origin` no longer has our tenant shard attached -> Ok(())

View File

@@ -28,8 +28,8 @@ use crate::{
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
@@ -1072,7 +1072,7 @@ impl Service {
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence
))]
fn process_result(&self, mut result: ReconcileResult) {
fn process_result(&self, result: ReconcileResult) {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else {
@@ -1094,22 +1094,27 @@ impl Service {
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
// make to the tenant
result
.observed
.locations
.retain(|node_id, _loc| nodes.contains_key(node_id));
let deltas = result.observed_deltas.into_iter().flat_map(|delta| {
// In case a node was deleted while this reconcile is in flight, filter it out of the update we will
// make to the tenant
let node = nodes.get(delta.node_id())?;
if node.is_available() {
return Some(delta);
}
// In case a node became unavailable concurrently with the reconcile, observed
// locations on it are now uncertain. By convention, set them to None in order
// for them to get refreshed when the node comes back online.
Some(ObservedStateDelta::Upsert(Box::new((
node.get_id(),
ObservedStateLocation { conf: None },
))))
});
match result.result {
Ok(()) => {
for (node_id, loc) in &result.observed.locations {
if let Some(conf) = &loc.conf {
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
} else {
tracing::info!("Setting observed location {} to None", node_id,)
}
}
tenant.observed = result.observed;
tenant.apply_observed_deltas(deltas);
tenant.waiter.advance(result.sequence);
}
Err(e) => {
@@ -1131,9 +1136,10 @@ impl Service {
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
for (node_id, o) in result.observed.locations {
tenant.observed.locations.insert(node_id, o);
}
// Skip deletions on reconcile failures
let upsert_deltas =
deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_)));
tenant.apply_observed_deltas(upsert_deltas);
}
}

View File

@@ -425,6 +425,22 @@ pub(crate) enum ReconcileNeeded {
Yes,
}
/// Pending modification to the observed state of a tenant shard.
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
pub(crate) enum ObservedStateDelta {
Upsert(Box<(NodeId, ObservedStateLocation)>),
Delete(NodeId),
}
impl ObservedStateDelta {
pub(crate) fn node_id(&self) -> &NodeId {
match self {
Self::Upsert(up) => &up.0,
Self::Delete(nid) => nid,
}
}
}
/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
@@ -437,7 +453,7 @@ pub(crate) struct ReconcileResult {
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) generation: Option<Generation>,
pub(crate) observed: ObservedState,
pub(crate) observed_deltas: Vec<ObservedStateDelta>,
/// Set [`TenantShard::pending_compute_notification`] from this flag
pub(crate) pending_compute_notification: bool,
@@ -1123,7 +1139,7 @@ impl TenantShard {
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
observed_deltas: reconciler.observed_deltas(),
pending_compute_notification: reconciler.compute_notify_failure,
}
}
@@ -1177,6 +1193,7 @@ impl TenantShard {
reconciler_config,
config: self.config.clone(),
observed: self.observed.clone(),
original_observed: self.observed.clone(),
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
@@ -1437,6 +1454,62 @@ impl TenantShard {
.map(|(node_id, gen)| (node_id, Generation::new(gen)))
.collect()
}
/// Update the observed state of the tenant by applying incremental deltas
///
/// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
/// They are then filtered in [`crate::service::Service::process_result`].
pub(crate) fn apply_observed_deltas(
&mut self,
deltas: impl Iterator<Item = ObservedStateDelta>,
) {
for delta in deltas {
match delta {
ObservedStateDelta::Upsert(ups) => {
let (node_id, loc) = *ups;
// If the generation of the observed location in the delta is lagging
// behind the current one, then we have a race condition and cannot
// be certain about the true observed state. Set the observed state
// to None in order to reflect this.
let crnt_gen = self
.observed
.locations
.get(&node_id)
.and_then(|loc| loc.conf.as_ref())
.and_then(|conf| conf.generation);
let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
match (crnt_gen, new_gen) {
(Some(crnt), Some(new)) if crnt_gen > new_gen => {
tracing::warn!(
"Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
node_id, loc, crnt, new
);
self.observed
.locations
.insert(node_id, ObservedStateLocation { conf: None });
continue;
}
_ => {}
}
if let Some(conf) = &loc.conf {
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
} else {
tracing::info!("Setting observed location {} to None", node_id,)
}
self.observed.locations.insert(node_id, loc);
}
ObservedStateDelta::Delete(node_id) => {
tracing::info!("Deleting observed location {}", node_id);
self.observed.locations.remove(&node_id);
}
}
}
}
}
#[cfg(test)]