mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
WeakHandle should store weak ref to the GateGuard
This commit is contained in:
@@ -110,6 +110,7 @@ use std::collections::HashMap;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Weak;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
@@ -175,7 +176,7 @@ pub(crate) struct Handle<T: Types> {
|
||||
}
|
||||
pub(crate) struct WeakHandle<T: Types> {
|
||||
timeline: T::Timeline,
|
||||
inner: Arc<tokio::sync::Mutex<HandleInner>>,
|
||||
inner: Weak<tokio::sync::Mutex<HandleInner>>,
|
||||
}
|
||||
enum HandleInner {
|
||||
KeepingTimelineGateOpen {
|
||||
@@ -384,7 +385,7 @@ impl<T: Types> Cache<T> {
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
v.insert(WeakHandle {
|
||||
timeline: timeline.clone(),
|
||||
inner: Arc::clone(&handle_inner_arc),
|
||||
inner: Arc::downgrade(&handle_inner_arc),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -410,7 +411,10 @@ pub(crate) enum HandleUpgradeError {
|
||||
|
||||
impl<T: Types> WeakHandle<T> {
|
||||
pub(crate) async fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
|
||||
let lock_guard = self.inner.clone().lock_owned().await;
|
||||
let Some(inner) = self.inner.upgrade() else {
|
||||
return Err(HandleUpgradeError::ShutDown);
|
||||
};
|
||||
let lock_guard = inner.clone().lock_owned().await;
|
||||
match &*lock_guard {
|
||||
HandleInner::KeepingTimelineGateOpen { .. } => Ok(Handle {
|
||||
timeline: self.timeline.clone(),
|
||||
@@ -435,8 +439,8 @@ impl<T: Types> Deref for Handle<T> {
|
||||
impl<T: Types> Handle<T> {
|
||||
pub(crate) fn downgrade(&self) -> WeakHandle<T> {
|
||||
WeakHandle {
|
||||
timeline: self.timeline.clone(),
|
||||
inner: Arc::clone(OwnedMutexGuard::mutex(&self.inner)),
|
||||
timeline: self.timeline.clone(), // FIXME: avoid this clone, it hits shared cache line
|
||||
inner: Arc::downgrade(OwnedMutexGuard::mutex(&self.inner)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -486,13 +490,13 @@ impl<T: Types> Drop for Cache<T> {
|
||||
for (
|
||||
_,
|
||||
WeakHandle {
|
||||
timeline,
|
||||
inner: handle_inner_arc,
|
||||
timeline: handle_timeline,
|
||||
inner: handle_inner_weak,
|
||||
},
|
||||
) in self.map.drain()
|
||||
{
|
||||
// handle is still being kept alive in PerTimelineState
|
||||
let per_timeline_state = timeline.per_timeline_state();
|
||||
let per_timeline_state = handle_timeline.per_timeline_state();
|
||||
let mut handles = per_timeline_state.handles.lock().expect("mutex poisoned");
|
||||
let Some(handles) = &mut *handles else {
|
||||
continue;
|
||||
@@ -503,7 +507,10 @@ impl<T: Types> Drop for Cache<T> {
|
||||
};
|
||||
let (_removed_timeline, removed_handle_inner_arc) = removed;
|
||||
// TODO (extend ArcTimeline trait): assert!(Arc::ptr_eq(&removed_timeline, &timeline));
|
||||
assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc));
|
||||
assert!(Weak::ptr_eq(
|
||||
&Arc::downgrade(&removed_handle_inner_arc),
|
||||
&handle_inner_weak
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user