From 3d5fab127ad4bd64034d7f9f8a5e94a30818013d Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 1 Feb 2024 00:15:58 +0200 Subject: [PATCH] rewrite Gate impl for better observability (#6542) changes: - two messages instead of message every second when gate was closing - replace the gate name string by using a pointer - slow GateGuards are likely to log who they were (see example) example found in regress tests: --- libs/utils/src/sync/gate.rs | 227 ++++++++++++++++++---------- pageserver/src/tenant.rs | 9 +- pageserver/src/tenant/mgr.rs | 1 + pageserver/src/tenant/secondary.rs | 2 +- pageserver/src/tenant/timeline.rs | 5 +- test_runner/regress/test_tenants.py | 5 - 6 files changed, 162 insertions(+), 87 deletions(-) diff --git a/libs/utils/src/sync/gate.rs b/libs/utils/src/sync/gate.rs index abc3842da8..c34176af57 100644 --- a/libs/utils/src/sync/gate.rs +++ b/libs/utils/src/sync/gate.rs @@ -1,4 +1,10 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; /// Gates are a concurrency helper, primarily used for implementing safe shutdown. /// @@ -6,62 +12,70 @@ use std::{sync::Arc, time::Duration}; /// the resource calls `close()` when they want to ensure that all holders of guards /// have released them, and that no future guards will be issued. pub struct Gate { - /// Each caller of enter() takes one unit from the semaphore. In close(), we - /// take all the units to ensure all GateGuards are destroyed. - sem: Arc, - - /// For observability only: a name that will be used to log warnings if a particular - /// gate is holding up shutdown - name: String, + inner: Arc, } impl std::fmt::Debug for Gate { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Gate<{}>", self.name) + f.debug_struct("Gate") + // use this for identification + .field("ptr", &Arc::as_ptr(&self.inner)) + .field("inner", &self.inner) + .finish() + } +} + +struct GateInner { + sem: tokio::sync::Semaphore, + closing: std::sync::atomic::AtomicBool, +} + +impl std::fmt::Debug for GateInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let avail = self.sem.available_permits(); + + let guards = u32::try_from(avail) + .ok() + // the sem only supports 32-bit ish amount, but lets play it safe + .and_then(|x| Gate::MAX_UNITS.checked_sub(x)); + + let closing = self.closing.load(Ordering::Relaxed); + + if let Some(guards) = guards { + f.debug_struct("Gate") + .field("remaining_guards", &guards) + .field("closing", &closing) + .finish() + } else { + f.debug_struct("Gate") + .field("avail_permits", &avail) + .field("closing", &closing) + .finish() + } } } /// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will /// not complete. #[derive(Debug)] -pub struct GateGuard(tokio::sync::OwnedSemaphorePermit); +pub struct GateGuard { + // Record the span where the gate was entered, so that we can identify who was blocking Gate::close + span_at_enter: tracing::Span, + gate: Arc, +} -/// Observability helper: every `warn_period`, emit a log warning that we're still waiting on this gate -async fn warn_if_stuck( - fut: Fut, - name: &str, - warn_period: std::time::Duration, -) -> ::Output { - let started = std::time::Instant::now(); - - let mut fut = std::pin::pin!(fut); - - let mut warned = false; - let ret = loop { - match tokio::time::timeout(warn_period, &mut fut).await { - Ok(ret) => break ret, - Err(_) => { - tracing::warn!( - gate = name, - elapsed_ms = started.elapsed().as_millis(), - "still waiting, taking longer than expected..." - ); - warned = true; - } +impl Drop for GateGuard { + fn drop(&mut self) { + if self.gate.closing.load(Ordering::Relaxed) { + self.span_at_enter.in_scope( + || tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"), + ); } - }; - // If we emitted a warning for slowness, also emit a message when we complete, so that - // someone debugging a shutdown can know for sure whether we have moved past this operation. - if warned { - tracing::info!( - gate = name, - elapsed_ms = started.elapsed().as_millis(), - "completed, after taking longer than expected" - ) + // when the permit was acquired, it was forgotten to allow us to manage it's lifecycle + // manually, so "return" the permit now. + self.gate.sem.add_permits(1); } - - ret } #[derive(Debug)] @@ -69,15 +83,19 @@ pub enum GateError { GateClosed, } -impl Gate { - const MAX_UNITS: u32 = u32::MAX; - - pub fn new(name: String) -> Self { +impl Default for Gate { + fn default() -> Self { Self { - sem: Arc::new(tokio::sync::Semaphore::new(Self::MAX_UNITS as usize)), - name, + inner: Arc::new(GateInner { + sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize), + closing: AtomicBool::new(false), + }), } } +} + +impl Gate { + const MAX_UNITS: u32 = u32::MAX; /// Acquire a guard that will prevent close() calls from completing. If close() /// was already called, this will return an error which should be interpreted @@ -88,11 +106,23 @@ impl Gate { /// to avoid blocking close() indefinitely: typically types that contain a Gate will /// also contain a CancellationToken. pub fn enter(&self) -> Result { - self.sem - .clone() - .try_acquire_owned() - .map(GateGuard) - .map_err(|_| GateError::GateClosed) + let permit = self + .inner + .sem + .try_acquire() + .map_err(|_| GateError::GateClosed)?; + + // we now have the permit, let's disable the normal raii functionality and leave + // "returning" the permit to our GateGuard::drop. + // + // this is done to avoid the need for multiple Arcs (one for semaphore, next for other + // fields). + permit.forget(); + + Ok(GateGuard { + span_at_enter: tracing::Span::current(), + gate: self.inner.clone(), + }) } /// Types with a shutdown() method and a gate should call this method at the @@ -102,48 +132,88 @@ impl Gate { /// important that the holders of such guards are respecting a CancellationToken which has /// been cancelled before entering this function. pub async fn close(&self) { - warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await + let started_at = std::time::Instant::now(); + let mut do_close = std::pin::pin!(self.do_close()); + + let nag_after = Duration::from_secs(1); + + let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else { + return; + }; + + tracing::info!( + gate = ?self.as_ptr(), + elapsed_ms = started_at.elapsed().as_millis(), + "closing is taking longer than expected" + ); + + // close operation is not trying to be cancellation safe as pageserver does not need it. + // + // note: "closing" is not checked in Gate::enter -- it exists just for observability, + // dropping of GateGuard after this will log who they were. + self.inner.closing.store(true, Ordering::Relaxed); + + do_close.await; + + tracing::info!( + gate = ?self.as_ptr(), + elapsed_ms = started_at.elapsed().as_millis(), + "close completed" + ); + } + + /// Used as an identity of a gate. This identity will be resolved to something useful when + /// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even + /// more. + /// + /// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate + /// open for too long. + fn as_ptr(&self) -> *const GateInner { + Arc::as_ptr(&self.inner) } /// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This /// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking /// the CancellationToken on such types is analogous to "Did shutdown start?" pub fn close_complete(&self) -> bool { - self.sem.is_closed() + self.inner.sem.is_closed() } + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))] async fn do_close(&self) { - tracing::debug!(gate = self.name, "Closing Gate..."); - match self.sem.acquire_many(Self::MAX_UNITS).await { - Ok(_units) => { + tracing::debug!("Closing Gate..."); + + match self.inner.sem.acquire_many(Self::MAX_UNITS).await { + Ok(_permit) => { // While holding all units, close the semaphore. All subsequent calls to enter() will fail. - self.sem.close(); + self.inner.sem.close(); } - Err(_) => { + Err(_closed) => { // Semaphore closed: we are the only function that can do this, so it indicates a double-call. // This is legal. Timeline::shutdown for example is not protected from being called more than // once. - tracing::debug!(gate = self.name, "Double close") + tracing::debug!("Double close") } } - tracing::debug!(gate = self.name, "Closed Gate.") + tracing::debug!("Closed Gate.") } } #[cfg(test)] mod tests { - use futures::FutureExt; - use super::*; #[tokio::test] - async fn test_idle_gate() { - // Having taken no gates, we should not be blocked in close - let gate = Gate::new("test".to_string()); + async fn close_unused() { + // Having taken no guards, we should not be blocked in close + let gate = Gate::default(); gate.close().await; + } + #[tokio::test] + async fn close_idle() { // If a guard is dropped before entering, close should not be blocked - let gate = Gate::new("test".to_string()); + let gate = Gate::default(); let guard = gate.enter().unwrap(); drop(guard); gate.close().await; @@ -152,25 +222,30 @@ mod tests { gate.enter().expect_err("enter should fail after close"); } - #[tokio::test] - async fn test_busy_gate() { - let gate = Gate::new("test".to_string()); + #[tokio::test(start_paused = true)] + async fn close_busy_gate() { + let gate = Gate::default(); + let forever = Duration::from_secs(24 * 7 * 365); - let guard = gate.enter().unwrap(); + let guard = + tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap()); let mut close_fut = std::pin::pin!(gate.close()); - // Close should be blocked - assert!(close_fut.as_mut().now_or_never().is_none()); + // Close should be waiting for guards to drop + tokio::time::timeout(forever, &mut close_fut) + .await + .unwrap_err(); // Attempting to enter() should fail, even though close isn't done yet. gate.enter() .expect_err("enter should fail after entering close"); + // this will now log, which we cannot verify except manually drop(guard); // Guard is gone, close should finish - assert!(close_fut.as_mut().now_or_never().is_some()); + close_fut.await; // Attempting to enter() is still forbidden gate.enter().expect_err("enter should fail finishing close"); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0543de931f..ebf6eb56b1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2094,7 +2094,10 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); timelines.values().for_each(|timeline| { let timeline = Arc::clone(timeline); - let span = Span::current(); + let timeline_id = timeline.timeline_id; + + let span = + tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush); js.spawn(async move { if freeze_and_flush { timeline.flush_and_shutdown().instrument(span).await @@ -2694,7 +2697,7 @@ impl Tenant { activate_now_sem: tokio::sync::Semaphore::new(0), delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())), cancel: CancellationToken::default(), - gate: Gate::new(format!("Tenant<{tenant_shard_id}>")), + gate: Gate::default(), } } @@ -5227,7 +5230,7 @@ mod tests { let raw_tline = tline.raw_timeline().unwrap(); raw_tline .shutdown() - .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id)) + .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, timeline_id=%TIMELINE_ID)) .await; std::mem::forget(tline); } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 32535e0134..949db3c543 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1311,6 +1311,7 @@ impl TenantManager { tenant_shard_id: TenantShardId, activation_timeout: Duration, ) -> Result<(), DeleteTenantError> { + super::span::debug_assert_current_span_has_tenant_id(); // We acquire a SlotGuard during this function to protect against concurrent // changes while the ::prepare phase of DeleteTenantFlow executes, but then // have to return the Tenant to the map while the background deletion runs. diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index d00d901be6..4269e1dec1 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -112,7 +112,7 @@ impl SecondaryTenant { // on shutdown we walk the tenants and fire their // individual cancellations? cancel: CancellationToken::new(), - gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")), + gate: Gate::default(), shard_identity, tenant_conf: std::sync::Mutex::new(tenant_conf), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 874603b81b..db739f1033 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1060,7 +1060,6 @@ impl Timeline { /// also to remote storage. This method can easily take multiple seconds for a busy timeline. /// /// While we are flushing, we continue to accept read I/O. - #[instrument(skip_all, fields(timeline_id=%self.timeline_id))] pub(crate) async fn flush_and_shutdown(&self) { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -1109,6 +1108,8 @@ impl Timeline { /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of /// the graceful [`Timeline::flush_and_shutdown`] function. pub(crate) async fn shutdown(&self) { + span::debug_assert_current_span_has_tenant_and_timeline_id(); + // Signal any subscribers to our cancellation token to drop out tracing::debug!("Cancelling CancellationToken"); self.cancel.cancel(); @@ -1502,7 +1503,7 @@ impl Timeline { delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())), cancel, - gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")), + gate: Gate::default(), compaction_lock: tokio::sync::Mutex::default(), gc_lock: tokio::sync::Mutex::default(), diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 5164bda470..ba391a69d8 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -376,11 +376,6 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder): # so we allow it to log at WARN, even if it is occasionally a false positive. env.pageserver.allowed_errors.append(".*failed to freeze and flush.*") - # When we shut down a tenant during a timeline creation, initdb is not cancelled, we wait - # for it to complete (since https://github.com/neondatabase/neon/pull/6451). This means - # that shutdown can be delayed by >=1s on debug builds where initdb takes a long time to run. - env.pageserver.allowed_errors.append(".*still waiting, taking longer than expected... gate.*") - def create_bg(delay_ms): time.sleep(delay_ms / 1000.0) try: