From 7fb4595c7efd3d1ec1c74fc74943537f1b99cdf2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 Jan 2025 20:44:20 +0100 Subject: [PATCH] fix: WeakHandle was holding on to the Timeline allocation This made test_timeline_deletion_with_files_stuck_in_upload_queue fail because the RemoteTimelineClient was being kept alive. The fix is to stop keeping the timeline alive from WeakHandle. --- pageserver/src/page_service.rs | 6 +- .../src/tenant/remote_timeline_client.rs | 6 + pageserver/src/tenant/timeline.rs | 7 +- pageserver/src/tenant/timeline/handle.rs | 143 ++++++++++-------- 4 files changed, 93 insertions(+), 69 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 685eea8541..623972140a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -403,7 +403,7 @@ impl timeline::handle::ArcTimeline for Arc { Timeline::shard_timeline_id(self) } - fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState { + fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState { &self.handles } @@ -941,7 +941,7 @@ impl PageServerHandler { assert_eq!(accum_pages.len(), max_batch_size.get()); return false; } - if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) { + if !accum_shard.is_same_handle_as(&this_shard) { trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); // TODO: we _could_ batch & execute each shard seperately (and in parallel). // But the current logic for keeping responses in order does not support that. @@ -979,7 +979,7 @@ impl PageServerHandler { assert_eq!(accum_requests.len(), max_batch_size.get()); return false; } - if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) { + if !accum_shard.is_same_handle_as(&this_shard) { trace!("stopping batching because timeline object mismatch"); // TODO: we _could_ batch & execute each shard seperately (and in parallel). // But the current logic for keeping responses in order does not support that. diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 47c4a8637d..a006647785 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -382,6 +382,12 @@ pub(crate) struct RemoteTimelineClient { cancel: CancellationToken, } +impl Drop for RemoteTimelineClient { + fn drop(&mut self) { + debug!("dropping RemoteTimelineClient"); + } +} + impl RemoteTimelineClient { /// /// Create a remote storage client for given timeline diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index faece1416d..41e48726cf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -72,6 +72,7 @@ use std::{pin::pin, sync::OnceLock}; use crate::{ aux_file::AuxFileSizeEstimator, + page_service::TenantManagerTypes, tenant::{ config::AttachmentMode, layer_map::{LayerMap, SearchResult}, @@ -427,7 +428,7 @@ pub struct Timeline { pub(crate) l0_flush_global_state: L0FlushGlobalState, - pub(crate) handles: handle::PerTimelineState, + pub(crate) handles: handle::PerTimelineState, pub(crate) attach_wal_lag_cooldown: Arc>, @@ -4618,6 +4619,10 @@ impl Drop for Timeline { } } } + info!( + "Timeline {} for tenant {} is being dropped", + self.timeline_id, self.tenant_shard_id.tenant_id + ); } } diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 8f6bf69cfb..09e8582e64 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -104,11 +104,8 @@ //! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc`. //! Again, this is cheap because the `Arc` is private to the connection. //! -//! In addition to the GateGuard, we need to provide `Deref` impls -//! for `Handle` or `WeakHandle`. -//! For this, both `Handle` and `WeakHandle` need access to the `Arc`. -//! Upgrading a `WeakHandle` does not consume the `Handle`, so we can't move the -//! `Arc` out of `Handle` into `WeakHandle`. +//! In addition to the GateGuard, we need to provide `Deref` impl. +//! For this, both `Handle` need infallible access to an `Arc`. //! We could clone the `Arc` when upgrading a `WeakHandle`, but that would cause contention //! on the shared memory location that trakcs the refcount of the `Arc`. //! Instead, we wrap the `Arc` into another `Arc`. @@ -119,7 +116,7 @@ //! The attentive reader may have noticed the following reference cycle around the `Arc`: //! //! ```text -//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> WeakHandle --strong--> Timeline +//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline //! ``` //! //! Further, there is this cycle: @@ -155,15 +152,25 @@ //! `PerTimelineState::shutdown`, that extension of the cycle is bounded. //! //! Concurrently existing `WeakHandle`s will fail to `upgrade()`: -//! while they will succeed in upgradingtheir weak arc ref to a strong ref, +//! while they will succeed in upgrading `Weak>`, //! they will find the inner in state `HandleInner::ShutDown` state where the -//! `Arc` has already been dropped. +//! `Arc` and Timeline has already been dropped. //! //! Dropping the `Cache` undoes the registration of this `Cache`'s //! `HandleInner`s from all the `PerTimelineState`s, i.e., it //! removes the strong ref to each of its `HandleInner`s //! from all the `PerTimelineState`. //! +//! # Locking Rules +//! +//! To prevent deadlocks we: +//! +//! 1. Only ever hold one of the locks at a time. +//! 2. Don't add more than one Drop impl that locks on the +//! cycles above. +//! +//! As per (2), that impl is in `Drop for Cache`. +//! //! # Fast Path for Shard Routing //! //! The `Cache` has a fast path for shard routing to avoid calling into @@ -262,16 +269,16 @@ pub(crate) struct Handle { timeline: Arc, #[allow(dead_code)] // the field exists to keep the gate open gate_guard: Arc, - inner: Arc>, + inner: Arc>>, } pub(crate) struct WeakHandle { - timeline: Arc, - inner: Weak>, + inner: Weak>>, } -enum HandleInner { +enum HandleInner { KeepingTimelineGateOpen { #[allow(dead_code)] gate_guard: Arc, + timeline: Arc, }, ShutDown, } @@ -279,12 +286,13 @@ enum HandleInner { /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`. /// /// See module-level comment for details. -pub struct PerTimelineState { +pub struct PerTimelineState { // None = shutting down - handles: Mutex>>>>, + #[allow(clippy::type_complexity)] + handles: Mutex>>>>>, } -impl Default for PerTimelineState { +impl Default for PerTimelineState { fn default() -> Self { Self { handles: Mutex::new(Some(Default::default())), @@ -308,7 +316,7 @@ pub(crate) trait ArcTimeline: Clone { fn gate(&self) -> &utils::sync::gate::Gate; fn shard_timeline_id(&self) -> ShardTimelineId; fn get_shard_identity(&self) -> &ShardIdentity; - fn per_timeline_state(&self) -> &PerTimelineState; + fn per_timeline_state(&self) -> &PerTimelineState; } /// Errors returned by [`Cache::get`]. @@ -441,22 +449,30 @@ impl Cache { ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index), } - let gate_guard = match timeline.gate().enter() { - Ok(guard) => guard, - Err(_) => { - return Err(GetError::TimelineGateClosed); - } - }; - let gate_guard = Arc::new(gate_guard); trace!("creating new HandleInner"); - let handle_inner_arc = Arc::new(Mutex::new( - // TODO: global metric that keeps track of the number of live HandlerTimeline instances - // so we can identify reference cycle bugs. - HandleInner::KeepingTimelineGateOpen { - gate_guard: Arc::clone(&gate_guard), - }, - )); - let timeline = { + let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen { + gate_guard: Arc::new( + // this enter() is expensive in production code because + // it hits the global Arc::gate refcounts + match timeline.gate().enter() { + Ok(guard) => guard, + Err(_) => { + return Err(GetError::TimelineGateClosed); + } + }, + ), + // this clone is expensive in production code because + // it hits the global Arc::clone refcounts + timeline: Arc::new(timeline.clone()), + })); + let handle_weak = WeakHandle { + inner: Arc::downgrade(&handle_inner_arc), + }; + let handle = handle_weak + .upgrade() + .ok() + .expect("we just created it and it's not linked anywhere yet"); + { let mut lock_guard = timeline .per_timeline_state() .handles @@ -476,12 +492,7 @@ impl Cache { unreachable!() } hash_map::Entry::Vacant(v) => { - let timeline = Arc::new(timeline.clone()); - v.insert(WeakHandle { - timeline: Arc::clone(&timeline), - inner: Arc::downgrade(&handle_inner_arc), - }); - timeline + v.insert(handle_weak); } } } @@ -489,12 +500,8 @@ impl Cache { return Err(GetError::PerTimelineStateShutDown); } } - }; - Ok(Handle { - timeline, - inner: handle_inner_arc, - gate_guard, - }) + } + Ok(handle) } Err(e) => Err(GetError::TenantManager(e)), } @@ -512,11 +519,15 @@ impl WeakHandle { }; let lock_guard = inner.lock().expect("poisoned"); match &*lock_guard { - HandleInner::KeepingTimelineGateOpen { gate_guard } => { + HandleInner::KeepingTimelineGateOpen { + timeline, + gate_guard, + } => { let gate_guard = Arc::clone(gate_guard); + let timeline = Arc::clone(timeline); drop(lock_guard); Ok(Handle { - timeline: Arc::clone(&self.timeline), + timeline, gate_guard, inner, }) @@ -524,8 +535,9 @@ impl WeakHandle { HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown), } } - pub(crate) fn timeline(&self) -> &T::Timeline { - &self.timeline + + pub(crate) fn is_same_handle_as(&self, other: &WeakHandle) -> bool { + Weak::ptr_eq(&self.inner, &other.inner) } } @@ -539,13 +551,12 @@ impl std::ops::Deref for Handle { impl Handle { pub(crate) fn downgrade(&self) -> WeakHandle { WeakHandle { - timeline: Arc::clone(&self.timeline), inner: Arc::downgrade(&self.inner), } } } -impl PerTimelineState { +impl PerTimelineState { /// After this method returns, [`Cache::get`] will never again return a [`Handle`] /// to the [`Types::Timeline`] that embeds this per-timeline state. /// Even if [`TenantManager::resolve`] would still resolve to it. @@ -581,35 +592,37 @@ impl Drop for Cache { for ( _, WeakHandle { - timeline: handle_timeline, inner: handle_inner_weak, }, ) in self.map.drain() { - // handle is still being kept alive in PerTimelineState + let Some(handle_inner_arc) = handle_inner_weak.upgrade() else { + continue; + }; + let handle_timeline = handle_inner_arc + // locking rules: drop lock before acquiring other lock below + .lock() + .expect("poisoned") + .shutdown(); let per_timeline_state = handle_timeline.per_timeline_state(); - let mut handles = per_timeline_state.handles.lock().expect("mutex poisoned"); - let Some(handles) = &mut *handles else { + let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned"); + let Some(handles) = &mut *handles_lock_guard else { continue; }; let Some(removed_handle_inner_arc) = handles.remove(&self.id) else { // There could have been a shutdown inbetween us upgrading the weak and locking the mutex. continue; }; - assert!(Weak::ptr_eq( - &Arc::downgrade(&removed_handle_inner_arc), - &handle_inner_weak - )); - let mut lock_guard = removed_handle_inner_arc.lock().expect("poisoned"); - lock_guard.shutdown(); + drop(handles_lock_guard); // locking rules: remember them when! + assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc,)); } } } -impl HandleInner { - fn shutdown(&mut self) { - match self { - HandleInner::KeepingTimelineGateOpen { .. } => *self = HandleInner::ShutDown, +impl HandleInner { + fn shutdown(&mut self) -> Arc { + match std::mem::replace(self, HandleInner::ShutDown) { + HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline, HandleInner::ShutDown => { unreachable!("handles are only shut down once in their lifetime"); } @@ -649,7 +662,7 @@ mod tests { gate: utils::sync::gate::Gate, id: TimelineId, shard: ShardIdentity, - per_timeline_state: PerTimelineState, + per_timeline_state: PerTimelineState, myself: Weak, } @@ -675,7 +688,7 @@ mod tests { &self.shard } - fn per_timeline_state(&self) -> &PerTimelineState { + fn per_timeline_state(&self) -> &PerTimelineState { &self.per_timeline_state } }