mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
storage controller: log hygiene & better error type (#7508)
These are testability/logging improvements spun off from #7475 - Don't log warnings for shutdown errors in compute hook - Revise logging around heartbeats and reconcile_all so that we aren't emitting such a large volume of INFO messages under normal quite conditions. - Clean up the `last_error` of TenantShard to hold a ReconcileError instead of a String, and use that properly typed error to suppress reconciler cancel errors during reconcile_all_now. This is important for tests that iteratively call that, as otherwise they would get 500 errors when some reconciler in flight was cancelled (perhaps due to a state change on the tenant shard starting a new reconciler).
This commit is contained in:
@@ -184,6 +184,19 @@ impl HeartbeaterTask {
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"Heartbeat round complete for {} nodes, {} offline",
|
||||
new_state.len(),
|
||||
new_state
|
||||
.values()
|
||||
.filter(|s| match s {
|
||||
PageserverState::Available { .. } => {
|
||||
false
|
||||
}
|
||||
PageserverState::Offline => true,
|
||||
})
|
||||
.count()
|
||||
);
|
||||
|
||||
let mut deltas = Vec::new();
|
||||
let now = Instant::now();
|
||||
|
||||
@@ -767,7 +767,10 @@ impl Reconciler {
|
||||
// It is up to the caller whether they want to drop out on this error, but they don't have to:
|
||||
// in general we should avoid letting unavailability of the cloud control plane stop us from
|
||||
// making progress.
|
||||
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
|
||||
if !matches!(e, NotifyError::ShuttingDown) {
|
||||
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
|
||||
}
|
||||
|
||||
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
|
||||
// needs to retry at some point.
|
||||
self.compute_notify_failure = true;
|
||||
|
||||
@@ -824,8 +824,7 @@ impl Service {
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
tenant.set_last_error(result.sequence, e);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
@@ -2805,7 +2804,14 @@ impl Service {
|
||||
tenant_shard_id: shard.tenant_shard_id,
|
||||
node_attached: *shard.intent.get_attached(),
|
||||
node_secondary: shard.intent.get_secondary().to_vec(),
|
||||
last_error: shard.last_error.lock().unwrap().clone(),
|
||||
last_error: shard
|
||||
.last_error
|
||||
.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.map(|e| format!("{e}"))
|
||||
.unwrap_or("".to_string())
|
||||
.clone(),
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
@@ -4031,7 +4037,7 @@ impl Service {
|
||||
// TODO: in the background, we should balance work back onto this pageserver
|
||||
}
|
||||
AvailabilityTransition::Unchanged => {
|
||||
tracing::info!("Node {} no change during config", node_id);
|
||||
tracing::debug!("Node {} no change during config", node_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4351,7 +4357,26 @@ impl Service {
|
||||
};
|
||||
|
||||
let waiter_count = waiters.len();
|
||||
self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
|
||||
match self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
|
||||
Ok(()) => {}
|
||||
Err(ReconcileWaitError::Failed(_, reconcile_error))
|
||||
if matches!(*reconcile_error, ReconcileError::Cancel) =>
|
||||
{
|
||||
// Ignore reconciler cancel errors: this reconciler might have shut down
|
||||
// because some other change superceded it. We will return a nonzero number,
|
||||
// so the caller knows they might have to call again to quiesce the system.
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"{} reconciles in reconcile_all, {} waiters",
|
||||
reconciles_spawned,
|
||||
waiter_count
|
||||
);
|
||||
|
||||
Ok(waiter_count)
|
||||
}
|
||||
|
||||
|
||||
@@ -38,12 +38,18 @@ use crate::{
|
||||
};
|
||||
|
||||
/// Serialization helper
|
||||
fn read_mutex_content<S, T>(v: &std::sync::Mutex<T>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
T: Clone + std::fmt::Display,
|
||||
T: std::fmt::Display,
|
||||
{
|
||||
serializer.collect_str(&v.lock().unwrap())
|
||||
serializer.collect_str(
|
||||
&v.lock()
|
||||
.unwrap()
|
||||
.as_ref()
|
||||
.map(|e| format!("{e}"))
|
||||
.unwrap_or("".to_string()),
|
||||
)
|
||||
}
|
||||
|
||||
/// In-memory state for a particular tenant shard.
|
||||
@@ -111,11 +117,15 @@ pub(crate) struct TenantShard {
|
||||
#[serde(skip)]
|
||||
pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
|
||||
/// The most recent error from a reconcile on this tenant
|
||||
/// The most recent error from a reconcile on this tenant. This is a nested Arc
|
||||
/// because:
|
||||
/// - ReconcileWaiters need to Arc-clone the overall object to read it later
|
||||
/// - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
|
||||
/// many waiters for one shard, and the underlying error types are not Clone.
|
||||
/// TODO: generalize to an array of recent events
|
||||
/// TOOD: use a ArcSwap instead of mutex for faster reads?
|
||||
#[serde(serialize_with = "read_mutex_content")]
|
||||
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
|
||||
#[serde(serialize_with = "read_last_error")]
|
||||
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
||||
|
||||
/// If we have a pending compute notification that for some reason we weren't able to send,
|
||||
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
|
||||
@@ -293,18 +303,18 @@ pub(crate) struct ReconcilerWaiter {
|
||||
|
||||
seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
error: std::sync::Arc<std::sync::Mutex<String>>,
|
||||
error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
|
||||
seq: Sequence,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ReconcileWaitError {
|
||||
pub(crate) enum ReconcileWaitError {
|
||||
#[error("Timeout waiting for shard {0}")]
|
||||
Timeout(TenantShardId),
|
||||
#[error("shutting down")]
|
||||
Shutdown,
|
||||
#[error("Reconcile error on shard {0}: {1}")]
|
||||
Failed(TenantShardId, String),
|
||||
Failed(TenantShardId, Arc<ReconcileError>),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
@@ -342,7 +352,8 @@ impl ReconcilerWaiter {
|
||||
SeqWaitError::Timeout => unreachable!()
|
||||
})?;
|
||||
|
||||
return Err(ReconcileWaitError::Failed(self.tenant_shard_id, self.error.lock().unwrap().clone()))
|
||||
return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
|
||||
self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -873,7 +884,7 @@ impl TenantShard {
|
||||
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
||||
|
||||
if !do_reconcile {
|
||||
tracing::info!("Not dirty, no reconciliation needed.");
|
||||
tracing::debug!("Not dirty, no reconciliation needed.");
|
||||
return ReconcileNeeded::No;
|
||||
}
|
||||
|
||||
@@ -1151,6 +1162,13 @@ impl TenantShard {
|
||||
&self.scheduling_policy
|
||||
}
|
||||
|
||||
pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
|
||||
// Ordering: always set last_error before advancing sequence, so that sequence
|
||||
// waiters are guaranteed to see a Some value when they see an error.
|
||||
*(self.last_error.lock().unwrap()) = Some(Arc::new(error));
|
||||
self.error_waiter.advance(sequence);
|
||||
}
|
||||
|
||||
pub(crate) fn from_persistent(
|
||||
tsp: TenantShardPersistence,
|
||||
intent: IntentState,
|
||||
|
||||
Reference in New Issue
Block a user