Merge branch 'problame/hung-shutdown/fix' into vlad/read-path-concurrent-io

This commit is contained in:
Christian Schwarz
2025-01-16 18:16:36 +01:00
6 changed files with 275 additions and 140 deletions

View File

@@ -1417,6 +1417,21 @@ pub enum PagestreamBeMessage {
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
Error = 3,
DbSize = 4,
GetSlruSegment = 5,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamBeMessageTag {
@@ -1426,10 +1441,29 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/// Test message discrimimant is unstable
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 106,
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::Error),
4 => Ok(PagestreamFeMessageTag::DbSize),
5 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
@@ -1441,7 +1475,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
106 => Ok(PagestreamBeMessageTag::Test),
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
@@ -1592,7 +1626,7 @@ impl PagestreamFeMessage {
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1603,7 +1637,7 @@ impl PagestreamFeMessage {
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1614,7 +1648,7 @@ impl PagestreamFeMessage {
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1626,7 +1660,7 @@ impl PagestreamFeMessage {
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1634,7 +1668,7 @@ impl PagestreamFeMessage {
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1643,7 +1677,7 @@ impl PagestreamFeMessage {
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(5);
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1679,56 +1713,66 @@ impl PagestreamFeMessage {
),
};
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
@@ -1740,7 +1784,7 @@ impl PagestreamFeMessage {
},
)),
#[cfg(feature = "testing")]
5 => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,

View File

@@ -424,7 +424,7 @@ impl timeline::handle::ArcTimeline<TenantManagerTypes> for Arc<Timeline> {
Timeline::shard_timeline_id(self)
}
fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState {
fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState<TenantManagerTypes> {
&self.handles
}
@@ -581,6 +581,9 @@ struct BatchedTestRequest {
timer: SmgrOpTimer,
}
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
enum BatchedFeMessage {
Exists {
span: Span,
@@ -964,7 +967,7 @@ impl PageServerHandler {
assert_eq!(accum_pages.len(), max_batch_size.get());
return false;
}
if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) {
if !accum_shard.is_same_handle_as(&this_shard) {
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
@@ -1002,7 +1005,7 @@ impl PageServerHandler {
assert_eq!(accum_requests.len(), max_batch_size.get());
return false;
}
if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) {
if !accum_shard.is_same_handle_as(&this_shard) {
trace!("stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.

View File

@@ -382,6 +382,12 @@ pub(crate) struct RemoteTimelineClient {
cancel: CancellationToken,
}
impl Drop for RemoteTimelineClient {
fn drop(&mut self) {
debug!("dropping RemoteTimelineClient");
}
}
impl RemoteTimelineClient {
///
/// Create a remote storage client for given timeline

View File

@@ -73,6 +73,7 @@ use std::{pin::pin, sync::OnceLock};
use crate::{
aux_file::AuxFileSizeEstimator,
page_service::TenantManagerTypes,
tenant::{
config::AttachmentMode,
layer_map::{LayerMap, SearchResult},
@@ -431,7 +432,7 @@ pub struct Timeline {
pub(crate) l0_flush_global_state: L0FlushGlobalState,
pub(crate) handles: handle::PerTimelineState,
pub(crate) handles: handle::PerTimelineState<TenantManagerTypes>,
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
@@ -4638,6 +4639,10 @@ impl Drop for Timeline {
}
}
}
info!(
"Timeline {} for tenant {} is being dropped",
self.timeline_id, self.tenant_shard_id.tenant_id
);
}
}

View File

@@ -104,11 +104,8 @@
//! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc<GateGuard>`.
//! Again, this is cheap because the `Arc` is private to the connection.
//!
//! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impls
//! for `Handle` or `WeakHandle`.
//! For this, both `Handle` and `WeakHandle` need access to the `Arc<Timeline>`.
//! Upgrading a `WeakHandle` does not consume the `Handle`, so we can't move the
//! `Arc<Timeline>` out of `Handle` into `WeakHandle`.
//! In addition to the GateGuard, we need to provide `Deref<Target=Timeline>` impl.
//! For this, both `Handle` need infallible access to an `Arc<Timeline>`.
//! We could clone the `Arc<Timeline>` when upgrading a `WeakHandle`, but that would cause contention
//! on the shared memory location that trakcs the refcount of the `Arc<Timeline>`.
//! Instead, we wrap the `Arc<Timeline>` into another `Arc`.
@@ -119,7 +116,7 @@
//! The attentive reader may have noticed the following reference cycle around the `Arc<Timeline>`:
//!
//! ```text
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> WeakHandle --strong--> Timeline
//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline
//! ```
//!
//! Further, there is this cycle:
@@ -155,15 +152,25 @@
//! `PerTimelineState::shutdown`, that extension of the cycle is bounded.
//!
//! Concurrently existing `WeakHandle`s will fail to `upgrade()`:
//! while they will succeed in upgradingtheir weak arc ref to a strong ref,
//! while they will succeed in upgrading `Weak<Mutex<HandleInner>>`,
//! they will find the inner in state `HandleInner::ShutDown` state where the
//! `Arc<GateGuard>` has already been dropped.
//! `Arc<GateGuard>` and Timeline has already been dropped.
//!
//! Dropping the `Cache` undoes the registration of this `Cache`'s
//! `HandleInner`s from all the `PerTimelineState`s, i.e., it
//! removes the strong ref to each of its `HandleInner`s
//! from all the `PerTimelineState`.
//!
//! # Locking Rules
//!
//! To prevent deadlocks we:
//!
//! 1. Only ever hold one of the locks at a time.
//! 2. Don't add more than one Drop impl that locks on the
//! cycles above.
//!
//! As per (2), that impl is in `Drop for Cache`.
//!
//! # Fast Path for Shard Routing
//!
//! The `Cache` has a fast path for shard routing to avoid calling into
@@ -262,16 +269,16 @@ pub(crate) struct Handle<T: Types> {
timeline: Arc<T::Timeline>,
#[allow(dead_code)] // the field exists to keep the gate open
gate_guard: Arc<utils::sync::gate::GateGuard>,
inner: Arc<Mutex<HandleInner>>,
inner: Arc<Mutex<HandleInner<T>>>,
}
pub(crate) struct WeakHandle<T: Types> {
timeline: Arc<T::Timeline>,
inner: Weak<Mutex<HandleInner>>,
inner: Weak<Mutex<HandleInner<T>>>,
}
enum HandleInner {
enum HandleInner<T: Types> {
KeepingTimelineGateOpen {
#[allow(dead_code)]
gate_guard: Arc<utils::sync::gate::GateGuard>,
timeline: Arc<T::Timeline>,
},
ShutDown,
}
@@ -279,12 +286,13 @@ enum HandleInner {
/// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`.
///
/// See module-level comment for details.
pub struct PerTimelineState {
pub struct PerTimelineState<T: Types> {
// None = shutting down
handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner>>>>>,
#[allow(clippy::type_complexity)]
handles: Mutex<Option<HashMap<CacheId, Arc<Mutex<HandleInner<T>>>>>>,
}
impl Default for PerTimelineState {
impl<T: Types> Default for PerTimelineState<T> {
fn default() -> Self {
Self {
handles: Mutex::new(Some(Default::default())),
@@ -308,7 +316,7 @@ pub(crate) trait ArcTimeline<T: Types>: Clone {
fn gate(&self) -> &utils::sync::gate::Gate;
fn shard_timeline_id(&self) -> ShardTimelineId;
fn get_shard_identity(&self) -> &ShardIdentity;
fn per_timeline_state(&self) -> &PerTimelineState;
fn per_timeline_state(&self) -> &PerTimelineState<T>;
}
/// Errors returned by [`Cache::get`].
@@ -340,17 +348,6 @@ impl<T: Types> Cache<T> {
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
self.get_impl(timeline_id, shard_selector, tenant_manager)
.await
}
#[instrument(level = "trace", skip_all)]
async fn get_impl(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
// terminates because when every iteration we remove an element from the map
let miss: ShardSelector = loop {
@@ -387,7 +384,7 @@ impl<T: Types> Cache<T> {
return RoutingResult::NeedConsultTenantManager;
};
let Ok(first_handle) = first_handle.upgrade() else {
// TODO: dedup with get_impl()
// TODO: dedup with get()
trace!("handle cache stale");
let first_key_owned = *first_key;
self.map.remove(&first_key_owned).unwrap();
@@ -441,22 +438,30 @@ impl<T: Types> Cache<T> {
ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index),
}
let gate_guard = match timeline.gate().enter() {
Ok(guard) => guard,
Err(_) => {
return Err(GetError::TimelineGateClosed);
}
};
let gate_guard = Arc::new(gate_guard);
trace!("creating new HandleInner");
let handle_inner_arc = Arc::new(Mutex::new(
// TODO: global metric that keeps track of the number of live HandlerTimeline instances
// so we can identify reference cycle bugs.
HandleInner::KeepingTimelineGateOpen {
gate_guard: Arc::clone(&gate_guard),
},
));
let timeline = {
let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen {
gate_guard: Arc::new(
// this enter() is expensive in production code because
// it hits the global Arc<Timeline>::gate refcounts
match timeline.gate().enter() {
Ok(guard) => guard,
Err(_) => {
return Err(GetError::TimelineGateClosed);
}
},
),
// this clone is expensive in production code because
// it hits the global Arc<Timeline>::clone refcounts
timeline: Arc::new(timeline.clone()),
}));
let handle_weak = WeakHandle {
inner: Arc::downgrade(&handle_inner_arc),
};
let handle = handle_weak
.upgrade()
.ok()
.expect("we just created it and it's not linked anywhere yet");
{
let mut lock_guard = timeline
.per_timeline_state()
.handles
@@ -476,12 +481,7 @@ impl<T: Types> Cache<T> {
unreachable!()
}
hash_map::Entry::Vacant(v) => {
let timeline = Arc::new(timeline.clone());
v.insert(WeakHandle {
timeline: Arc::clone(&timeline),
inner: Arc::downgrade(&handle_inner_arc),
});
timeline
v.insert(handle_weak);
}
}
}
@@ -489,12 +489,8 @@ impl<T: Types> Cache<T> {
return Err(GetError::PerTimelineStateShutDown);
}
}
};
Ok(Handle {
timeline,
inner: handle_inner_arc,
gate_guard,
})
}
Ok(handle)
}
Err(e) => Err(GetError::TenantManager(e)),
}
@@ -512,11 +508,15 @@ impl<T: Types> WeakHandle<T> {
};
let lock_guard = inner.lock().expect("poisoned");
match &*lock_guard {
HandleInner::KeepingTimelineGateOpen { gate_guard } => {
HandleInner::KeepingTimelineGateOpen {
timeline,
gate_guard,
} => {
let gate_guard = Arc::clone(gate_guard);
let timeline = Arc::clone(timeline);
drop(lock_guard);
Ok(Handle {
timeline: Arc::clone(&self.timeline),
timeline,
gate_guard,
inner,
})
@@ -524,8 +524,9 @@ impl<T: Types> WeakHandle<T> {
HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown),
}
}
pub(crate) fn timeline(&self) -> &T::Timeline {
&self.timeline
pub(crate) fn is_same_handle_as(&self, other: &WeakHandle<T>) -> bool {
Weak::ptr_eq(&self.inner, &other.inner)
}
}
@@ -539,13 +540,12 @@ impl<T: Types> std::ops::Deref for Handle<T> {
impl<T: Types> Handle<T> {
pub(crate) fn downgrade(&self) -> WeakHandle<T> {
WeakHandle {
timeline: Arc::clone(&self.timeline),
inner: Arc::downgrade(&self.inner),
}
}
}
impl PerTimelineState {
impl<T: Types> PerTimelineState<T> {
/// After this method returns, [`Cache::get`] will never again return a [`Handle`]
/// to the [`Types::Timeline`] that embeds this per-timeline state.
/// Even if [`TenantManager::resolve`] would still resolve to it.
@@ -581,35 +581,37 @@ impl<T: Types> Drop for Cache<T> {
for (
_,
WeakHandle {
timeline: handle_timeline,
inner: handle_inner_weak,
},
) in self.map.drain()
{
// handle is still being kept alive in PerTimelineState
let Some(handle_inner_arc) = handle_inner_weak.upgrade() else {
continue;
};
let handle_timeline = handle_inner_arc
// locking rules: drop lock before acquiring other lock below
.lock()
.expect("poisoned")
.shutdown();
let per_timeline_state = handle_timeline.per_timeline_state();
let mut handles = per_timeline_state.handles.lock().expect("mutex poisoned");
let Some(handles) = &mut *handles else {
let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned");
let Some(handles) = &mut *handles_lock_guard else {
continue;
};
let Some(removed_handle_inner_arc) = handles.remove(&self.id) else {
// There could have been a shutdown inbetween us upgrading the weak and locking the mutex.
continue;
};
assert!(Weak::ptr_eq(
&Arc::downgrade(&removed_handle_inner_arc),
&handle_inner_weak
));
let mut lock_guard = removed_handle_inner_arc.lock().expect("poisoned");
lock_guard.shutdown();
drop(handles_lock_guard); // locking rules: remember them when!
assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc,));
}
}
}
impl HandleInner {
fn shutdown(&mut self) {
match self {
HandleInner::KeepingTimelineGateOpen { .. } => *self = HandleInner::ShutDown,
impl<T: Types> HandleInner<T> {
fn shutdown(&mut self) -> Arc<T::Timeline> {
match std::mem::replace(self, HandleInner::ShutDown) {
HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline,
HandleInner::ShutDown => {
unreachable!("handles are only shut down once in their lifetime");
}
@@ -649,7 +651,7 @@ mod tests {
gate: utils::sync::gate::Gate,
id: TimelineId,
shard: ShardIdentity,
per_timeline_state: PerTimelineState,
per_timeline_state: PerTimelineState<TestTypes>,
myself: Weak<StubTimeline>,
}
@@ -675,7 +677,7 @@ mod tests {
&self.shard
}
fn per_timeline_state(&self) -> &PerTimelineState {
fn per_timeline_state(&self) -> &PerTimelineState<TestTypes> {
&self.per_timeline_state
}
}
@@ -1059,4 +1061,75 @@ mod tests {
}
}
}
#[tokio::test(start_paused = true)]
async fn test_weak_handles() {
crate::tenant::harness::setup_logging();
let timeline_id = TimelineId::generate();
let shard0 = Arc::new_cyclic(|myself| StubTimeline {
gate: Default::default(),
id: timeline_id,
shard: ShardIdentity::unsharded(),
per_timeline_state: PerTimelineState::default(),
myself: myself.clone(),
});
let mgr = StubManager {
shards: vec![shard0.clone()],
};
let refcount_start = Arc::strong_count(&shard0);
let key = DBDIR_KEY;
let mut cache = Cache::<TestTypes>::default();
let handle = cache
.get(timeline_id, ShardSelector::Page(key), &mgr)
.await
.expect("we have the timeline");
assert!(Weak::ptr_eq(&handle.myself, &shard0.myself));
let weak_handle = handle.downgrade();
drop(handle);
let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it");
// Start shutdown
shard0.per_timeline_state.shutdown();
// Upgrades during shutdown don't work, even if upgraded_handle exists.
weak_handle
.upgrade()
.err()
.expect("can't upgrade weak handle as soon as shutdown started");
// But upgraded_handle is still alive, so the gate won't close.
tokio::select! {
_ = shard0.gate.close() => {
panic!("handle is keeping gate open");
}
_ = tokio::time::sleep(FOREVER) => { }
}
// Drop the last handle.
drop(upgraded_handle);
// The gate should close now, despite there still being a weak_handle.
tokio::select! {
_ = shard0.gate.close() => { }
_ = tokio::time::sleep(FOREVER) => {
panic!("only strong handle is dropped and we shut down per-timeline-state")
}
}
// The weak handle still can't be upgraded.
weak_handle
.upgrade()
.err()
.expect("still shouldn't be able to upgrade the weak handle");
// There should be no strong references to the timeline object except the one on "stack".
assert_eq!(Arc::strong_count(&shard0), refcount_start);
}
}

View File

@@ -34,6 +34,8 @@ typedef enum
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
/* future tags above this line */
T_NeonTestRequest = 99, /* only in cfg(feature = "testing") */
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -42,6 +44,8 @@ typedef enum
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
/* future tags above this line */
T_NeonTestResponse = 199, /* only in cfg(feature = "testing") */
} NeonMessageTag;
typedef uint64 NeonRequestId;