mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
fix: WeakHandle was holding on to the Timeline allocation
This made test_timeline_deletion_with_files_stuck_in_upload_queue fail because the RemoteTimelineClient was being kept alive. The fix is to stop keeping the timeline alive from WeakHandle.
This commit is contained in:
@@ -403,7 +403,7 @@ impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
|
||||
Timeline::shard_timeline_id(self)
|
||||
}
|
||||
|
||||
fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState {
|
||||
fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
|
||||
&self.handles
|
||||
}
|
||||
|
||||
@@ -941,7 +941,7 @@ impl PageServerHandler {
|
||||
assert_eq!(accum_pages.len(), max_batch_size.get());
|
||||
return false;
|
||||
}
|
||||
if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) {
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
@@ -979,7 +979,7 @@ impl PageServerHandler {
|
||||
assert_eq!(accum_requests.len(), max_batch_size.get());
|
||||
return false;
|
||||
}
|
||||
if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) {
|
||||
if !accum_shard.is_same_handle_as(&this_shard) {
|
||||
trace!("stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
|
||||
@@ -382,6 +382,12 @@ pub(crate) struct RemoteTimelineClient {
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClient {
|
||||
fn drop(&mut self) {
|
||||
debug!("dropping RemoteTimelineClient");
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTimelineClient {
|
||||
///
|
||||
/// Create a remote storage client for given timeline
|
||||
|
||||
@@ -72,6 +72,7 @@ use std::{pin::pin, sync::OnceLock};
|
||||
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
page_service::TenantManagerTypes,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -427,7 +428,7 @@ pub struct Timeline {
|
||||
|
||||
pub(crate) l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
pub(crate) handles: handle::PerTimelineState,
|
||||
pub(crate) handles: handle::PerTimelineState<TenantManagerTypes>,
|
||||
|
||||
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
|
||||
|
||||
@@ -4618,6 +4619,10 @@ impl Drop for Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Timeline {} for tenant {} is being dropped",
|
||||
self.timeline_id, self.tenant_shard_id.tenant_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -104,11 +104,8 @@
|
||||
//! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
|
||||
//! Again, this is cheap because the `Arc` is private to the connection.
|
||||
//!
|
||||
//! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impls
|
||||
//! for `Handle` or `WeakHandle`.
|
||||
//! For this, both `Handle` and `WeakHandle` need access to the `Arc<Timeline>`.
|
||||
//! Upgrading a `WeakHandle` does not consume the `Handle`, so we can't move the
|
||||
//! `Arc<Timeline>` out of `Handle` into `WeakHandle`.
|
||||
//! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
|
||||
//! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
|
||||
//! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
|
||||
//! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
|
||||
//! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
|
||||
@@ -119,7 +116,7 @@
|
||||
//! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
|
||||
//!
|
||||
//! ```text
|
||||
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> WeakHandle --strong--> Timeline
|
||||
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
|
||||
//! ```
|
||||
//!
|
||||
//! Further, there is this cycle:
|
||||
@@ -155,15 +152,25 @@
|
||||
//! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
|
||||
//!
|
||||
//! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
|
||||
//! while they will succeed in upgradingtheir weak arc ref to a strong ref,
|
||||
//! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
|
||||
//! they will find the inner in state `HandleInner::ShutDown` state where the
|
||||
//! `Arc<GateGuard>` has already been dropped.
|
||||
//! `Arc<GateGuard>` and Timeline has already been dropped.
|
||||
//!
|
||||
//! Dropping the `Cache` undoes the registration of this `Cache`'s
|
||||
//! `HandleInner`s from all the `PerTimelineState`s, i.e., it
|
||||
//! removes the strong ref to each of its `HandleInner`s
|
||||
//! from all the `PerTimelineState`.
|
||||
//!
|
||||
//! # Locking Rules
|
||||
//!
|
||||
//! To prevent deadlocks we:
|
||||
//!
|
||||
//! 1. Only ever hold one of the locks at a time.
|
||||
//! 2. Don't add more than one Drop impl that locks on the
|
||||
//! cycles above.
|
||||
//!
|
||||
//! As per (2), that impl is in `Drop for Cache`.
|
||||
//!
|
||||
//! # Fast Path for Shard Routing
|
||||
//!
|
||||
//! The `Cache` has a fast path for shard routing to avoid calling into
|
||||
@@ -262,16 +269,16 @@ pub(crate) struct Handle<T: Types> {
|
||||
timeline: Arc<T::Timeline>,
|
||||
#[allow(dead_code)] // the field exists to keep the gate open
|
||||
gate_guard: Arc<utils::sync::gate::GateGuard>,
|
||||
inner: Arc<Mutex<HandleInner>>,
|
||||
inner: Arc<Mutex<HandleInner<T>>>,
|
||||
}
|
||||
pub(crate) struct WeakHandle<T: Types> {
|
||||
timeline: Arc<T::Timeline>,
|
||||
inner: Weak<Mutex<HandleInner>>,
|
||||
inner: Weak<Mutex<HandleInner<T>>>,
|
||||
}
|
||||
enum HandleInner {
|
||||
enum HandleInner<T: Types> {
|
||||
KeepingTimelineGateOpen {
|
||||
#[allow(dead_code)]
|
||||
gate_guard: Arc<utils::sync::gate::GateGuard>,
|
||||
timeline: Arc<T::Timeline>,
|
||||
},
|
||||
ShutDown,
|
||||
}
|
||||
@@ -279,12 +286,13 @@ enum HandleInner {
|
||||
/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
|
||||
///
|
||||
/// See module-level comment for details.
|
||||
pub struct PerTimelineState {
|
||||
pub struct PerTimelineState<T: Types> {
|
||||
// None = shutting down
|
||||
handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner>>>>>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
|
||||
}
|
||||
|
||||
impl Default for PerTimelineState {
|
||||
impl<T: Types> Default for PerTimelineState<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
handles: Mutex::new(Some(Default::default())),
|
||||
@@ -308,7 +316,7 @@ pub(crate) trait ArcTimeline<T: Types>: Clone {
|
||||
fn gate(&self) -> &utils::sync::gate::Gate;
|
||||
fn shard_timeline_id(&self) -> ShardTimelineId;
|
||||
fn get_shard_identity(&self) -> &ShardIdentity;
|
||||
fn per_timeline_state(&self) -> &PerTimelineState;
|
||||
fn per_timeline_state(&self) -> &PerTimelineState<T>;
|
||||
}
|
||||
|
||||
/// Errors returned by [`Cache::get`].
|
||||
@@ -441,22 +449,30 @@ impl<T: Types> Cache<T> {
|
||||
ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
|
||||
}
|
||||
|
||||
let gate_guard = match timeline.gate().enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
return Err(GetError::TimelineGateClosed);
|
||||
}
|
||||
};
|
||||
let gate_guard = Arc::new(gate_guard);
|
||||
trace!("creating new HandleInner");
|
||||
let handle_inner_arc = Arc::new(Mutex::new(
|
||||
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
|
||||
// so we can identify reference cycle bugs.
|
||||
HandleInner::KeepingTimelineGateOpen {
|
||||
gate_guard: Arc::clone(&gate_guard),
|
||||
},
|
||||
));
|
||||
let timeline = {
|
||||
let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
|
||||
gate_guard: Arc::new(
|
||||
// this enter() is expensive in production code because
|
||||
// it hits the global Arc<Timeline>::gate refcounts
|
||||
match timeline.gate().enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
return Err(GetError::TimelineGateClosed);
|
||||
}
|
||||
},
|
||||
),
|
||||
// this clone is expensive in production code because
|
||||
// it hits the global Arc<Timeline>::clone refcounts
|
||||
timeline: Arc::new(timeline.clone()),
|
||||
}));
|
||||
let handle_weak = WeakHandle {
|
||||
inner: Arc::downgrade(&handle_inner_arc),
|
||||
};
|
||||
let handle = handle_weak
|
||||
.upgrade()
|
||||
.ok()
|
||||
.expect("we just created it and it's not linked anywhere yet");
|
||||
{
|
||||
let mut lock_guard = timeline
|
||||
.per_timeline_state()
|
||||
.handles
|
||||
@@ -476,12 +492,7 @@ impl<T: Types> Cache<T> {
|
||||
unreachable!()
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let timeline = Arc::new(timeline.clone());
|
||||
v.insert(WeakHandle {
|
||||
timeline: Arc::clone(&timeline),
|
||||
inner: Arc::downgrade(&handle_inner_arc),
|
||||
});
|
||||
timeline
|
||||
v.insert(handle_weak);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -489,12 +500,8 @@ impl<T: Types> Cache<T> {
|
||||
return Err(GetError::PerTimelineStateShutDown);
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(Handle {
|
||||
timeline,
|
||||
inner: handle_inner_arc,
|
||||
gate_guard,
|
||||
})
|
||||
}
|
||||
Ok(handle)
|
||||
}
|
||||
Err(e) => Err(GetError::TenantManager(e)),
|
||||
}
|
||||
@@ -512,11 +519,15 @@ impl<T: Types> WeakHandle<T> {
|
||||
};
|
||||
let lock_guard = inner.lock().expect("poisoned");
|
||||
match &*lock_guard {
|
||||
HandleInner::KeepingTimelineGateOpen { gate_guard } => {
|
||||
HandleInner::KeepingTimelineGateOpen {
|
||||
timeline,
|
||||
gate_guard,
|
||||
} => {
|
||||
let gate_guard = Arc::clone(gate_guard);
|
||||
let timeline = Arc::clone(timeline);
|
||||
drop(lock_guard);
|
||||
Ok(Handle {
|
||||
timeline: Arc::clone(&self.timeline),
|
||||
timeline,
|
||||
gate_guard,
|
||||
inner,
|
||||
})
|
||||
@@ -524,8 +535,9 @@ impl<T: Types> WeakHandle<T> {
|
||||
HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
|
||||
}
|
||||
}
|
||||
pub(crate) fn timeline(&self) -> &T::Timeline {
|
||||
&self.timeline
|
||||
|
||||
pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
|
||||
Weak::ptr_eq(&self.inner, &other.inner)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,13 +551,12 @@ impl<T: Types> std::ops::Deref for Handle<T> {
|
||||
impl<T: Types> Handle<T> {
|
||||
pub(crate) fn downgrade(&self) -> WeakHandle<T> {
|
||||
WeakHandle {
|
||||
timeline: Arc::clone(&self.timeline),
|
||||
inner: Arc::downgrade(&self.inner),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PerTimelineState {
|
||||
impl<T: Types> PerTimelineState<T> {
|
||||
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
|
||||
/// to the [`Types::Timeline`] that embeds this per-timeline state.
|
||||
/// Even if [`TenantManager::resolve`] would still resolve to it.
|
||||
@@ -581,35 +592,37 @@ impl<T: Types> Drop for Cache<T> {
|
||||
for (
|
||||
_,
|
||||
WeakHandle {
|
||||
timeline: handle_timeline,
|
||||
inner: handle_inner_weak,
|
||||
},
|
||||
) in self.map.drain()
|
||||
{
|
||||
// handle is still being kept alive in PerTimelineState
|
||||
let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
|
||||
continue;
|
||||
};
|
||||
let handle_timeline = handle_inner_arc
|
||||
// locking rules: drop lock before acquiring other lock below
|
||||
.lock()
|
||||
.expect("poisoned")
|
||||
.shutdown();
|
||||
let per_timeline_state = handle_timeline.per_timeline_state();
|
||||
let mut handles = per_timeline_state.handles.lock().expect("mutex poisoned");
|
||||
let Some(handles) = &mut *handles else {
|
||||
let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
|
||||
let Some(handles) = &mut *handles_lock_guard else {
|
||||
continue;
|
||||
};
|
||||
let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
|
||||
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
|
||||
continue;
|
||||
};
|
||||
assert!(Weak::ptr_eq(
|
||||
&Arc::downgrade(&removed_handle_inner_arc),
|
||||
&handle_inner_weak
|
||||
));
|
||||
let mut lock_guard = removed_handle_inner_arc.lock().expect("poisoned");
|
||||
lock_guard.shutdown();
|
||||
drop(handles_lock_guard); // locking rules: remember them when!
|
||||
assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc,));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HandleInner {
|
||||
fn shutdown(&mut self) {
|
||||
match self {
|
||||
HandleInner::KeepingTimelineGateOpen { .. } => *self = HandleInner::ShutDown,
|
||||
impl<T: Types> HandleInner<T> {
|
||||
fn shutdown(&mut self) -> Arc<T::Timeline> {
|
||||
match std::mem::replace(self, HandleInner::ShutDown) {
|
||||
HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline,
|
||||
HandleInner::ShutDown => {
|
||||
unreachable!("handles are only shut down once in their lifetime");
|
||||
}
|
||||
@@ -649,7 +662,7 @@ mod tests {
|
||||
gate: utils::sync::gate::Gate,
|
||||
id: TimelineId,
|
||||
shard: ShardIdentity,
|
||||
per_timeline_state: PerTimelineState,
|
||||
per_timeline_state: PerTimelineState<TestTypes>,
|
||||
myself: Weak<StubTimeline>,
|
||||
}
|
||||
|
||||
@@ -675,7 +688,7 @@ mod tests {
|
||||
&self.shard
|
||||
}
|
||||
|
||||
fn per_timeline_state(&self) -> &PerTimelineState {
|
||||
fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
|
||||
&self.per_timeline_state
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user