rewrite Gate impl for better observability (#6542)

changes:
- two messages instead of message every second when gate was closing
- replace the gate name string by using a pointer
- slow GateGuards are likely to log who they were (see example)

example found in regress tests: <https://github.com/neondatabase/neon/pull/6542#issuecomment-1919009256>
This commit is contained in:
Joonas Koivunen
2024-02-01 00:15:58 +02:00
committed by GitHub
parent 66719d7eaf
commit 3d5fab127a
6 changed files with 162 additions and 87 deletions

View File

@@ -1,4 +1,10 @@
use std::{sync::Arc, time::Duration};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
///
@@ -6,62 +12,70 @@ use std::{sync::Arc, time::Duration};
/// the resource calls `close()` when they want to ensure that all holders of guards
/// have released them, and that no future guards will be issued.
pub struct Gate {
/// Each caller of enter() takes one unit from the semaphore. In close(), we
/// take all the units to ensure all GateGuards are destroyed.
sem: Arc<tokio::sync::Semaphore>,
/// For observability only: a name that will be used to log warnings if a particular
/// gate is holding up shutdown
name: String,
inner: Arc<GateInner>,
}
impl std::fmt::Debug for Gate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Gate<{}>", self.name)
f.debug_struct("Gate")
// use this for identification
.field("ptr", &Arc::as_ptr(&self.inner))
.field("inner", &self.inner)
.finish()
}
}
struct GateInner {
sem: tokio::sync::Semaphore,
closing: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for GateInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let avail = self.sem.available_permits();
let guards = u32::try_from(avail)
.ok()
// the sem only supports 32-bit ish amount, but lets play it safe
.and_then(|x| Gate::MAX_UNITS.checked_sub(x));
let closing = self.closing.load(Ordering::Relaxed);
if let Some(guards) = guards {
f.debug_struct("Gate")
.field("remaining_guards", &guards)
.field("closing", &closing)
.finish()
} else {
f.debug_struct("Gate")
.field("avail_permits", &avail)
.field("closing", &closing)
.finish()
}
}
}
/// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
/// not complete.
#[derive(Debug)]
pub struct GateGuard(tokio::sync::OwnedSemaphorePermit);
pub struct GateGuard {
// Record the span where the gate was entered, so that we can identify who was blocking Gate::close
span_at_enter: tracing::Span,
gate: Arc<GateInner>,
}
/// Observability helper: every `warn_period`, emit a log warning that we're still waiting on this gate
async fn warn_if_stuck<Fut: std::future::Future>(
fut: Fut,
name: &str,
warn_period: std::time::Duration,
) -> <Fut as std::future::Future>::Output {
let started = std::time::Instant::now();
let mut fut = std::pin::pin!(fut);
let mut warned = false;
let ret = loop {
match tokio::time::timeout(warn_period, &mut fut).await {
Ok(ret) => break ret,
Err(_) => {
tracing::warn!(
gate = name,
elapsed_ms = started.elapsed().as_millis(),
"still waiting, taking longer than expected..."
);
warned = true;
}
impl Drop for GateGuard {
fn drop(&mut self) {
if self.gate.closing.load(Ordering::Relaxed) {
self.span_at_enter.in_scope(
|| tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
);
}
};
// If we emitted a warning for slowness, also emit a message when we complete, so that
// someone debugging a shutdown can know for sure whether we have moved past this operation.
if warned {
tracing::info!(
gate = name,
elapsed_ms = started.elapsed().as_millis(),
"completed, after taking longer than expected"
)
// when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
// manually, so "return" the permit now.
self.gate.sem.add_permits(1);
}
ret
}
#[derive(Debug)]
@@ -69,15 +83,19 @@ pub enum GateError {
GateClosed,
}
impl Gate {
const MAX_UNITS: u32 = u32::MAX;
pub fn new(name: String) -> Self {
impl Default for Gate {
fn default() -> Self {
Self {
sem: Arc::new(tokio::sync::Semaphore::new(Self::MAX_UNITS as usize)),
name,
inner: Arc::new(GateInner {
sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
closing: AtomicBool::new(false),
}),
}
}
}
impl Gate {
const MAX_UNITS: u32 = u32::MAX;
/// Acquire a guard that will prevent close() calls from completing. If close()
/// was already called, this will return an error which should be interpreted
@@ -88,11 +106,23 @@ impl Gate {
/// to avoid blocking close() indefinitely: typically types that contain a Gate will
/// also contain a CancellationToken.
pub fn enter(&self) -> Result<GateGuard, GateError> {
self.sem
.clone()
.try_acquire_owned()
.map(GateGuard)
.map_err(|_| GateError::GateClosed)
let permit = self
.inner
.sem
.try_acquire()
.map_err(|_| GateError::GateClosed)?;
// we now have the permit, let's disable the normal raii functionality and leave
// "returning" the permit to our GateGuard::drop.
//
// this is done to avoid the need for multiple Arcs (one for semaphore, next for other
// fields).
permit.forget();
Ok(GateGuard {
span_at_enter: tracing::Span::current(),
gate: self.inner.clone(),
})
}
/// Types with a shutdown() method and a gate should call this method at the
@@ -102,48 +132,88 @@ impl Gate {
/// important that the holders of such guards are respecting a CancellationToken which has
/// been cancelled before entering this function.
pub async fn close(&self) {
warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await
let started_at = std::time::Instant::now();
let mut do_close = std::pin::pin!(self.do_close());
let nag_after = Duration::from_secs(1);
let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
return;
};
tracing::info!(
gate = ?self.as_ptr(),
elapsed_ms = started_at.elapsed().as_millis(),
"closing is taking longer than expected"
);
// close operation is not trying to be cancellation safe as pageserver does not need it.
//
// note: "closing" is not checked in Gate::enter -- it exists just for observability,
// dropping of GateGuard after this will log who they were.
self.inner.closing.store(true, Ordering::Relaxed);
do_close.await;
tracing::info!(
gate = ?self.as_ptr(),
elapsed_ms = started_at.elapsed().as_millis(),
"close completed"
);
}
/// Used as an identity of a gate. This identity will be resolved to something useful when
/// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
/// more.
///
/// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
/// open for too long.
fn as_ptr(&self) -> *const GateInner {
Arc::as_ptr(&self.inner)
}
/// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
/// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
/// the CancellationToken on such types is analogous to "Did shutdown start?"
pub fn close_complete(&self) -> bool {
self.sem.is_closed()
self.inner.sem.is_closed()
}
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
async fn do_close(&self) {
tracing::debug!(gate = self.name, "Closing Gate...");
match self.sem.acquire_many(Self::MAX_UNITS).await {
Ok(_units) => {
tracing::debug!("Closing Gate...");
match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
Ok(_permit) => {
// While holding all units, close the semaphore. All subsequent calls to enter() will fail.
self.sem.close();
self.inner.sem.close();
}
Err(_) => {
Err(_closed) => {
// Semaphore closed: we are the only function that can do this, so it indicates a double-call.
// This is legal. Timeline::shutdown for example is not protected from being called more than
// once.
tracing::debug!(gate = self.name, "Double close")
tracing::debug!("Double close")
}
}
tracing::debug!(gate = self.name, "Closed Gate.")
tracing::debug!("Closed Gate.")
}
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use super::*;
#[tokio::test]
async fn test_idle_gate() {
// Having taken no gates, we should not be blocked in close
let gate = Gate::new("test".to_string());
async fn close_unused() {
// Having taken no guards, we should not be blocked in close
let gate = Gate::default();
gate.close().await;
}
#[tokio::test]
async fn close_idle() {
// If a guard is dropped before entering, close should not be blocked
let gate = Gate::new("test".to_string());
let gate = Gate::default();
let guard = gate.enter().unwrap();
drop(guard);
gate.close().await;
@@ -152,25 +222,30 @@ mod tests {
gate.enter().expect_err("enter should fail after close");
}
#[tokio::test]
async fn test_busy_gate() {
let gate = Gate::new("test".to_string());
#[tokio::test(start_paused = true)]
async fn close_busy_gate() {
let gate = Gate::default();
let forever = Duration::from_secs(24 * 7 * 365);
let guard = gate.enter().unwrap();
let guard =
tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
let mut close_fut = std::pin::pin!(gate.close());
// Close should be blocked
assert!(close_fut.as_mut().now_or_never().is_none());
// Close should be waiting for guards to drop
tokio::time::timeout(forever, &mut close_fut)
.await
.unwrap_err();
// Attempting to enter() should fail, even though close isn't done yet.
gate.enter()
.expect_err("enter should fail after entering close");
// this will now log, which we cannot verify except manually
drop(guard);
// Guard is gone, close should finish
assert!(close_fut.as_mut().now_or_never().is_some());
close_fut.await;
// Attempting to enter() is still forbidden
gate.enter().expect_err("enter should fail finishing close");

View File

@@ -2094,7 +2094,10 @@ impl Tenant {
let timelines = self.timelines.lock().unwrap();
timelines.values().for_each(|timeline| {
let timeline = Arc::clone(timeline);
let span = Span::current();
let timeline_id = timeline.timeline_id;
let span =
tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
js.spawn(async move {
if freeze_and_flush {
timeline.flush_and_shutdown().instrument(span).await
@@ -2694,7 +2697,7 @@ impl Tenant {
activate_now_sem: tokio::sync::Semaphore::new(0),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
gate: Gate::default(),
}
}
@@ -5227,7 +5230,7 @@ mod tests {
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown()
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id))
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, timeline_id=%TIMELINE_ID))
.await;
std::mem::forget(tline);
}

View File

@@ -1311,6 +1311,7 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
activation_timeout: Duration,
) -> Result<(), DeleteTenantError> {
super::span::debug_assert_current_span_has_tenant_id();
// We acquire a SlotGuard during this function to protect against concurrent
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
// have to return the Tenant to the map while the background deletion runs.

View File

@@ -112,7 +112,7 @@ impl SecondaryTenant {
// on shutdown we walk the tenants and fire their
// individual cancellations?
cancel: CancellationToken::new(),
gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
gate: Gate::default(),
shard_identity,
tenant_conf: std::sync::Mutex::new(tenant_conf),

View File

@@ -1060,7 +1060,6 @@ impl Timeline {
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
///
/// While we are flushing, we continue to accept read I/O.
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
pub(crate) async fn flush_and_shutdown(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -1109,6 +1108,8 @@ impl Timeline {
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
/// the graceful [`Timeline::flush_and_shutdown`] function.
pub(crate) async fn shutdown(&self) {
span::debug_assert_current_span_has_tenant_and_timeline_id();
// Signal any subscribers to our cancellation token to drop out
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
@@ -1502,7 +1503,7 @@ impl Timeline {
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
cancel,
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
gate: Gate::default(),
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),

View File

@@ -376,11 +376,6 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
# so we allow it to log at WARN, even if it is occasionally a false positive.
env.pageserver.allowed_errors.append(".*failed to freeze and flush.*")
# When we shut down a tenant during a timeline creation, initdb is not cancelled, we wait
# for it to complete (since https://github.com/neondatabase/neon/pull/6451). This means
# that shutdown can be delayed by >=1s on debug builds where initdb takes a long time to run.
env.pageserver.allowed_errors.append(".*still waiting, taking longer than expected... gate.*")
def create_bg(delay_ms):
time.sleep(delay_ms / 1000.0)
try: