diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cf2276f65e..714410baf4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -907,9 +907,7 @@ impl PageServerHandler { assert_eq!(accum_pages.len(), max_batch_size.get()); return false; } - if (accum_shard.tenant_shard_id, accum_shard.timeline_id) - != (this_shard.tenant_shard_id, this_shard.timeline_id) - { + if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) { trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); // TODO: we _could_ batch & execute each shard seperately (and in parallel). // But the current logic for keeping responses in order does not support that. diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 983eae89ca..2d48890aa9 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -113,8 +113,8 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::Weak; -use futures::lock::OwnedMutexGuard; use pageserver_api::shard::ShardIdentity; +use tokio::sync::OwnedMutexGuard; use tracing::instrument; use tracing::trace; use utils::id::TimelineId; @@ -172,11 +172,10 @@ pub(crate) struct ShardTimelineId { } /// See module-level comment. -pub(crate) struct Handle(tokio::sync::OwnedMutexGuard>); -pub(crate) struct WeakHandle(Arc>>); -enum HandleInner { +pub(crate) struct Handle(T::Timeline, tokio::sync::OwnedMutexGuard); +pub(crate) struct WeakHandle(T::Timeline, Arc>); +enum HandleInner { KeepingTimelineGateOpen { - timeline: T::Timeline, gate_guard: utils::sync::gate::GateGuard, }, ShutDown, @@ -187,7 +186,7 @@ enum HandleInner { /// See module-level comment for details. pub struct PerTimelineState { // None = shutting down - handles: Mutex>>>>>, + handles: Mutex>)>>>, } impl Default for PerTimelineState { @@ -247,24 +246,8 @@ impl Cache { shard_selector: ShardSelector, tenant_manager: &T::TenantManager, ) -> Result, GetError> { - // terminates because each iteration removes an element from the map - loop { - let handle = self - .get_impl(timeline_id, shard_selector, tenant_manager) - .await?; - if handle.0.check_shut_down().is_none() { - let removed = self - .map - .remove(&handle.0.timeline.shard_timeline_id()) - .expect("invariant of get_impl is that the returned handle is in the map"); - assert!( - Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)), - "shard_timeline_id() incorrect?" - ); - } else { - return Ok(handle); - } - } + self.get_impl(timeline_id, shard_selector, tenant_manager) + .await } #[instrument(level = "trace", skip_all)] @@ -275,13 +258,14 @@ impl Cache { tenant_manager: &T::TenantManager, ) -> Result, GetError> { let miss: ShardSelector = { - let routing_state = self.shard_routing(timeline_id, shard_selector); + let routing_state = self.shard_routing(timeline_id, shard_selector).await; match routing_state { RoutingResult::FastPath(handle) => return Ok(handle), RoutingResult::SlowPath(key) => match self.map.get(&key) { - Some(cached) => match cached.upgrade() { - Some(upgraded) => return Ok(Handle(upgraded)), - None => { + Some(cached) => match cached.upgrade().await { + Ok(upgraded) => return Ok(upgraded), + Err(HandleUpgradeError::ShutDown) => { + // TODO: dedup with shard_routing() trace!("handle cache stale"); self.map.remove(&key).unwrap(); ShardSelector::Known(key.shard_index) @@ -296,7 +280,7 @@ impl Cache { } #[inline(always)] - fn shard_routing( + async fn shard_routing( &mut self, timeline_id: TimelineId, shard_selector: ShardSelector, @@ -306,15 +290,15 @@ impl Cache { let Some((first_key, first_handle)) = self.map.iter().next() else { return RoutingResult::NeedConsultTenantManager; }; - let Some(first_handle) = first_handle.upgrade() else { - // TODO: dedup with get() + let Ok(first_handle) = first_handle.upgrade().await else { + // TODO: dedup with get_impl() trace!("handle cache stale"); let first_key_owned = *first_key; self.map.remove(&first_key_owned).unwrap(); continue; }; - let first_handle_shard_identity = first_handle.timeline.get_shard_identity(); + let first_handle_shard_identity = first_handle.get_shard_identity(); let make_shard_index = |shard_num: ShardNumber| ShardIndex { shard_number: shard_num, shard_count: first_handle_shard_identity.count, @@ -333,11 +317,11 @@ impl Cache { }; let first_handle_shard_timeline_id = ShardTimelineId { shard_index: first_handle_shard_identity.shard_index(), - timeline_id: first_handle.timeline.shard_timeline_id().timeline_id, + timeline_id: first_handle.shard_timeline_id().timeline_id, }; if need_shard_timeline_id == first_handle_shard_timeline_id { - return RoutingResult::FastPath(Handle(first_handle)); + return RoutingResult::FastPath(first_handle); } else { return RoutingResult::SlowPath(need_shard_timeline_id); } @@ -368,15 +352,13 @@ impl Cache { } }; trace!("creating new HandleInner"); - let handle = Arc::new( + let handle_inner_arc = Arc::new(tokio::sync::Mutex::new( // TODO: global metric that keeps track of the number of live HandlerTimeline instances // so we can identify reference cycle bugs. - HandleInner::KeepingTimelineGateOpen { - timeline: timeline.clone(), - gate_guard, - }, - ); - let handle = { + HandleInner::KeepingTimelineGateOpen { gate_guard }, + )); + let handle_locked = handle_inner_arc.clone().try_lock_owned().unwrap(); + { let mut lock_guard = timeline .per_timeline_state() .handles @@ -384,7 +366,8 @@ impl Cache { .expect("mutex poisoned"); match &mut *lock_guard { Some(per_timeline_state) => { - let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle)); + let replaced = per_timeline_state + .insert(self.id, (timeline.clone(), Arc::clone(&handle_inner_arc))); assert!(replaced.is_none(), "some earlier code left a stale handle"); match self.map.entry(key) { hash_map::Entry::Occupied(_o) => { @@ -395,8 +378,10 @@ impl Cache { unreachable!() } hash_map::Entry::Vacant(v) => { - v.insert(Arc::downgrade(&handle)); - handle + v.insert(WeakHandle( + timeline.clone(), + Arc::clone(&handle_inner_arc), + )); } } } @@ -404,8 +389,8 @@ impl Cache { return Err(GetError::PerTimelineStateShutDown); } } - }; - Ok(Handle(handle)) + } + Ok(Handle(timeline.clone(), handle_locked)) } Err(e) => Err(GetError::TenantManager(e)), } @@ -418,31 +403,41 @@ pub(crate) enum HandleUpgradeError { impl WeakHandle { pub(crate) async fn upgrade(&self) -> Result, HandleUpgradeError> { - let lock_guard = self.0.lock_owned().await; + let lock_guard = self.1.clone().lock_owned().await; match &*lock_guard { - HandleInner::KeepingTimelineGateOpen { - timeline, - gate_guard, - } => Ok(Handle(lock_guard)), + HandleInner::KeepingTimelineGateOpen { gate_guard } => { + Ok(Handle(self.0.clone(), lock_guard)) + } HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown), } } + pub(crate) fn timeline(&self) -> &T::Timeline { + &self.0 + } } impl Deref for Handle { type Target = T::Timeline; fn deref(&self) -> &Self::Target { - match &*self.0 { - HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline, - HandleInner::ShutDown => unreachable!(), - } + &self.0 } } impl Handle { pub(crate) fn downgrade(&self) -> WeakHandle { - WeakHandle(Arc::clone(&self.0.mutex())) + WeakHandle(self.0.clone(), Arc::clone(OwnedMutexGuard::mutex(&self.1))) + } +} + +impl HandleInner { + pub(crate) fn shutdown(&mut self) { + match self { + HandleInner::KeepingTimelineGateOpen { .. } => *self = HandleInner::ShutDown, + HandleInner::ShutDown => { + unreachable!("handles are only shut down once in their lifetime"); + } + } } } @@ -465,40 +460,15 @@ impl PerTimelineState { trace!("already shut down"); return; }; - for handle in handles.values() { + for (_, state) in handles.values().map(|h| &*h) { // Make further cache hits - let mut lock_guard = handle.lock().await; - match std::mem::replace(&mut *lock_guard, HandleInner::ShutDown) { - HandleInner::KeepingTimelineGateOpen { gate_guard, .. } => { - drop(gate_guard); - } - HandleInner::ShutDown => unreachable!(), - } + let mut lock_guard = state.lock().await; + lock_guard.shutdown(); } drop(handles); } } -// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle. -impl Drop for Cache { - fn drop(&mut self) { - for (_, weak) in self.map.drain() { - if let Some(strong) = weak.upgrade() { - // handle is still being kept alive in PerTimelineState - let timeline = strong.timeline.per_timeline_state(); - let mut handles = timeline.handles.lock().expect("mutex poisoned"); - if let Some(handles) = &mut *handles { - let Some(removed) = handles.remove(&self.id) else { - // There could have been a shutdown inbetween us upgrading the weak and locking the mutex. - continue; - }; - assert!(Arc::ptr_eq(&removed, &strong)); - } - } - } - } -} - #[cfg(test)] mod tests { use pageserver_api::{