adjust shutdown

This commit is contained in:
Christian Schwarz
2025-01-13 14:34:15 +01:00
parent 9591d8789c
commit dda31b9cb6
2 changed files with 17 additions and 20 deletions

View File

@@ -1826,7 +1826,7 @@ impl Timeline {
self.cancel.cancel();
// Ensure Prevent new page service requests from starting.
self.handles.shutdown();
self.handles.shutdown().await;
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)

View File

@@ -189,7 +189,7 @@ enum HandleInner<T: Types> {
/// See module-level comment for details.
pub struct PerTimelineState<T: Types> {
// None = shutting down
handles: Mutex<Option<HashMap<CacheId, Arc<HandleInner<T>>>>>,
handles: Mutex<Option<HashMap<CacheId, Arc<tokio::sync::Mutex<HandleInner<T>>>>>>,
}
impl<T: Types> Default for PerTimelineState<T> {
@@ -430,14 +430,12 @@ impl<T: Types> Handle<T> {
}
impl<T: Types> PerTimelineState<T> {
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
/// to the [`Types::Timeline`] that embeds this per-timeline state.
/// Even if [`TenantManager::resolve`] would still resolve to it.
/// Invalidate all handles to this timeline in all [`Cache`]s.
///
/// Already-alive [`Handle`]s for will remain open, usable, and keeping the [`ArcTimeline`] alive.
/// That's ok because they're short-lived. See module-level comment for details.
/// After this method returns, all subsequent [`Handle::upgrade`] will fail
/// and they will not be holding the [`ArcTimeline`]'s gate open.
#[instrument(level = "trace", skip_all)]
pub(super) fn shutdown(&self) {
pub(super) async fn shutdown(&self) {
let handles = self
.handles
.lock()
@@ -451,20 +449,19 @@ impl<T: Types> PerTimelineState<T> {
return;
};
for handle in handles.values() {
// Make hits fail.
handle.shut_down.store(true, Ordering::Relaxed);
// Make further cache hits
let mut lock_guard = handle.lock().await;
match std::mem::replace(&mut *lock_guard, HandleInner::ShutDown) {
HandleInner::KeepingTimelineGateOpen { gate_guard, .. } => {
drop(gate_guard);
}
HandleInner::ShutDown => unreachable!(),
}
}
drop(handles);
}
}
#[cfg(test)]
impl<T: Types> Drop for HandleInner<T> {
fn drop(&mut self) {
trace!("HandleInner dropped");
}
}
// When dropping a [`Cache`], prune its handles in the [`PerTimelineState`] to break the reference cycle.
impl<T: Types> Drop for Cache<T> {
fn drop(&mut self) {
@@ -651,7 +648,7 @@ mod tests {
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
// SHUTDOWN
shard0.per_timeline_state.shutdown(); // keeping handle alive across shutdown
shard0.per_timeline_state.shutdown().await; // keeping handle alive across shutdown
assert_eq!(
1,
@@ -764,7 +761,7 @@ mod tests {
assert_eq!(cache.map.len(), 2);
// delete timeline A
timeline_a.per_timeline_state.shutdown();
timeline_a.per_timeline_state.shutdown().await;
mgr.shards.retain(|t| t.id != timeline_a.id);
assert!(
mgr.resolve(timeline_a.id, ShardSelector::Page(key))
@@ -892,7 +889,7 @@ mod tests {
assert!(Weak::ptr_eq(&parent_handle.myself, &parent.myself));
// invalidate the cache
parent.per_timeline_state.shutdown();
parent.per_timeline_state.shutdown().await;
// the cache will now return the child, even though the parent handle still exists
for i in 0..2 {