it compiles

This commit is contained in:
Christian Schwarz
2025-01-14 11:29:30 +01:00
parent 9e03dda0c3
commit 6007a94f91
2 changed files with 55 additions and 87 deletions

View File

@@ -907,9 +907,7 @@ impl PageServerHandler {
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) {
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.

View File

@@ -113,8 +113,8 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use futures::lock::OwnedMutexGuard;
use pageserver_api::shard::ShardIdentity;
use tokio::sync::OwnedMutexGuard;
use tracing::instrument;
use tracing::trace;
use utils::id::TimelineId;
@@ -172,11 +172,10 @@ pub(crate) struct ShardTimelineId {
}
/// See module-level comment.
pub(crate) struct Handle<T: Types>(tokio::sync::OwnedMutexGuard<HandleInner<T>>);
pub(crate) struct WeakHandle<T: Types>(Arc<tokio::sync::Mutex<HandleInner<T>>>);
enum HandleInner<T: Types> {
pub(crate) struct Handle<T: Types>(T::Timeline, tokio::sync::OwnedMutexGuard<HandleInner>);
pub(crate) struct WeakHandle<T: Types>(T::Timeline, Arc<tokio::sync::Mutex<HandleInner>>);
enum HandleInner {
KeepingTimelineGateOpen {
timeline: T::Timeline,
gate_guard: utils::sync::gate::GateGuard,
},
ShutDown,
@@ -187,7 +186,7 @@ enum HandleInner<T: Types> {
/// See module-level comment for details.
pub struct PerTimelineState<T: Types> {
// None = shutting down
handles: Mutex<Option<HashMap<CacheId, Arc<tokio::sync::Mutex<HandleInner<T>>>>>>,
handles: Mutex<Option<HashMap<CacheId, (T::Timeline, Arc<tokio::sync::Mutex<HandleInner>>)>>>,
}
impl<T: Types> Default for PerTimelineState<T> {
@@ -247,24 +246,8 @@ impl<T: Types> Cache<T> {
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
// terminates because each iteration removes an element from the map
loop {
let handle = self
.get_impl(timeline_id, shard_selector, tenant_manager)
.await?;
if handle.0.check_shut_down().is_none() {
let removed = self
.map
.remove(&handle.0.timeline.shard_timeline_id())
.expect("invariant of get_impl is that the returned handle is in the map");
assert!(
Weak::ptr_eq(&removed, &Arc::downgrade(&handle.0)),
"shard_timeline_id() incorrect?"
);
} else {
return Ok(handle);
}
}
self.get_impl(timeline_id, shard_selector, tenant_manager)
.await
}
#[instrument(level = "trace", skip_all)]
@@ -275,13 +258,14 @@ impl<T: Types> Cache<T> {
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
let miss: ShardSelector = {
let routing_state = self.shard_routing(timeline_id, shard_selector);
let routing_state = self.shard_routing(timeline_id, shard_selector).await;
match routing_state {
RoutingResult::FastPath(handle) => return Ok(handle),
RoutingResult::SlowPath(key) => match self.map.get(&key) {
Some(cached) => match cached.upgrade() {
Some(upgraded) => return Ok(Handle(upgraded)),
None => {
Some(cached) => match cached.upgrade().await {
Ok(upgraded) => return Ok(upgraded),
Err(HandleUpgradeError::ShutDown) => {
// TODO: dedup with shard_routing()
trace!("handle cache stale");
self.map.remove(&key).unwrap();
ShardSelector::Known(key.shard_index)
@@ -296,7 +280,7 @@ impl<T: Types> Cache<T> {
}
#[inline(always)]
fn shard_routing(
async fn shard_routing(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
@@ -306,15 +290,15 @@ impl<T: Types> Cache<T> {
let Some((first_key, first_handle)) = self.map.iter().next() else {
return RoutingResult::NeedConsultTenantManager;
};
let Some(first_handle) = first_handle.upgrade() else {
// TODO: dedup with get()
let Ok(first_handle) = first_handle.upgrade().await else {
// TODO: dedup with get_impl()
trace!("handle cache stale");
let first_key_owned = *first_key;
self.map.remove(&first_key_owned).unwrap();
continue;
};
let first_handle_shard_identity = first_handle.timeline.get_shard_identity();
let first_handle_shard_identity = first_handle.get_shard_identity();
let make_shard_index = |shard_num: ShardNumber| ShardIndex {
shard_number: shard_num,
shard_count: first_handle_shard_identity.count,
@@ -333,11 +317,11 @@ impl<T: Types> Cache<T> {
};
let first_handle_shard_timeline_id = ShardTimelineId {
shard_index: first_handle_shard_identity.shard_index(),
timeline_id: first_handle.timeline.shard_timeline_id().timeline_id,
timeline_id: first_handle.shard_timeline_id().timeline_id,
};
if need_shard_timeline_id == first_handle_shard_timeline_id {
return RoutingResult::FastPath(Handle(first_handle));
return RoutingResult::FastPath(first_handle);
} else {
return RoutingResult::SlowPath(need_shard_timeline_id);
}
@@ -368,15 +352,13 @@ impl<T: Types> Cache<T> {
}
};
trace!("creating new HandleInner");
let handle = Arc::new(
let handle_inner_arc = Arc::new(tokio::sync::Mutex::new(
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
// so we can identify reference cycle bugs.
HandleInner::KeepingTimelineGateOpen {
timeline: timeline.clone(),
gate_guard,
},
);
let handle = {
HandleInner::KeepingTimelineGateOpen { gate_guard },
));
let handle_locked = handle_inner_arc.clone().try_lock_owned().unwrap();
{
let mut lock_guard = timeline
.per_timeline_state()
.handles
@@ -384,7 +366,8 @@ impl<T: Types> Cache<T> {
.expect("mutex poisoned");
match &mut *lock_guard {
Some(per_timeline_state) => {
let replaced = per_timeline_state.insert(self.id, Arc::clone(&handle));
let replaced = per_timeline_state
.insert(self.id, (timeline.clone(), Arc::clone(&handle_inner_arc)));
assert!(replaced.is_none(), "some earlier code left a stale handle");
match self.map.entry(key) {
hash_map::Entry::Occupied(_o) => {
@@ -395,8 +378,10 @@ impl<T: Types> Cache<T> {
unreachable!()
}
hash_map::Entry::Vacant(v) => {
v.insert(Arc::downgrade(&handle));
handle
v.insert(WeakHandle(
timeline.clone(),
Arc::clone(&handle_inner_arc),
));
}
}
}
@@ -404,8 +389,8 @@ impl<T: Types> Cache<T> {
return Err(GetError::PerTimelineStateShutDown);
}
}
};
Ok(Handle(handle))
}
Ok(Handle(timeline.clone(), handle_locked))
}
Err(e) => Err(GetError::TenantManager(e)),
}
@@ -418,31 +403,41 @@ pub(crate) enum HandleUpgradeError {
impl<T: Types> WeakHandle<T> {
pub(crate) async fn upgrade(&self) -> Result<Handle<T>, HandleUpgradeError> {
let lock_guard = self.0.lock_owned().await;
let lock_guard = self.1.clone().lock_owned().await;
match &*lock_guard {
HandleInner::KeepingTimelineGateOpen {
timeline,
gate_guard,
} => Ok(Handle(lock_guard)),
HandleInner::KeepingTimelineGateOpen { gate_guard } => {
Ok(Handle(self.0.clone(), lock_guard))
}
HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
}
}
pub(crate) fn timeline(&self) -> &T::Timeline {
&self.0
}
}
impl<T: Types> Deref for Handle<T> {
type Target = T::Timeline;
fn deref(&self) -> &Self::Target {
match &*self.0 {
HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline,
HandleInner::ShutDown => unreachable!(),
}
&self.0
}
}
impl<T: Types> Handle<T> {
pub(crate) fn downgrade(&self) -> WeakHandle<T> {
WeakHandle(Arc::clone(&self.0.mutex()))
WeakHandle(self.0.clone(), Arc::clone(OwnedMutexGuard::mutex(&self.1)))
}
}
impl HandleInner {
pub(crate) fn shutdown(&mut self) {
match self {
HandleInner::KeepingTimelineGateOpen { .. } => *self = HandleInner::ShutDown,
HandleInner::ShutDown => {
unreachable!("handles are only shut down once in their lifetime");
}
}
}
}
@@ -465,40 +460,15 @@ impl<T: Types> PerTimelineState<T> {
trace!("already shut down");
return;
};
for handle in handles.values() {
for (_, state) in handles.values().map(|h| &*h) {
// Make further cache hits
let mut lock_guard = handle.lock().await;
match std::mem::replace(&mut *lock_guard, HandleInner::ShutDown) {
HandleInner::KeepingTimelineGateOpen { gate_guard, .. } => {
drop(gate_guard);
}
HandleInner::ShutDown => unreachable!(),
}
let mut lock_guard = state.lock().await;
lock_guard.shutdown();
}
drop(handles);
}
}
// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
impl<T: Types> Drop for Cache<T> {
fn drop(&mut self) {
for (_, weak) in self.map.drain() {
if let Some(strong) = weak.upgrade() {
// handle is still being kept alive in PerTimelineState
let timeline = strong.timeline.per_timeline_state();
let mut handles = timeline.handles.lock().expect("mutex poisoned");
if let Some(handles) = &mut *handles {
let Some(removed) = handles.remove(&self.id) else {
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
continue;
};
assert!(Arc::ptr_eq(&removed, &strong));
}
}
}
}
}
#[cfg(test)]
mod tests {
use pageserver_api::{