From 350dc251df4a5574e569b2bd0c2d45d7c03a89c7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 Jan 2025 21:44:28 +0100 Subject: [PATCH 1/6] test case demonstrates the issue: we hod Timeline object alive --- STDERR: pageserver tenant::timeline::handle::tests::test_weak_handles --- thread 'tenant::timeline::handle::tests::test_weak_handles' panicked at pageserver/src/tenant/timeline/handle.rs:1131:9: assertion `left == right` failed left: 3 right: 2 --- pageserver/src/tenant/timeline/handle.rs | 71 ++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index d57776a2fe..8f6bf69cfb 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -1059,4 +1059,75 @@ mod tests { } } } + + #[tokio::test(start_paused = true)] + async fn test_weak_handles() { + crate::tenant::harness::setup_logging(); + let timeline_id = TimelineId::generate(); + let shard0 = Arc::new_cyclic(|myself| StubTimeline { + gate: Default::default(), + id: timeline_id, + shard: ShardIdentity::unsharded(), + per_timeline_state: PerTimelineState::default(), + myself: myself.clone(), + }); + let mgr = StubManager { + shards: vec![shard0.clone()], + }; + + let refcount_start = Arc::strong_count(&shard0); + + let key = DBDIR_KEY; + + let mut cache = Cache::::default(); + + let handle = cache + .get(timeline_id, ShardSelector::Page(key), &mgr) + .await + .expect("we have the timeline"); + assert!(Weak::ptr_eq(&handle.myself, &shard0.myself)); + + let weak_handle = handle.downgrade(); + + drop(handle); + + let upgraded_handle = weak_handle.upgrade().ok().expect("we can upgrade it"); + + // Start shutdown + shard0.per_timeline_state.shutdown(); + + // Upgrades during shutdown don't work, even if upgraded_handle exists. + weak_handle + .upgrade() + .err() + .expect("can't upgrade weak handle as soon as shutdown started"); + + // But upgraded_handle is still alive, so the gate won't close. + tokio::select! { + _ = shard0.gate.close() => { + panic!("handle is keeping gate open"); + } + _ = tokio::time::sleep(FOREVER) => { } + } + + // Drop the last handle. + drop(upgraded_handle); + + // The gate should close now, despite there still being a weak_handle. + tokio::select! { + _ = shard0.gate.close() => { } + _ = tokio::time::sleep(FOREVER) => { + panic!("only strong handle is dropped and we shut down per-timeline-state") + } + } + + // The weak handle still can't be upgraded. + weak_handle + .upgrade() + .err() + .expect("still shouldn't be able to upgrade the weak handle"); + + // There should be no strong references to the timeline object except the one on "stack". + assert_eq!(Arc::strong_count(&shard0), refcount_start); + } } From 7fb4595c7efd3d1ec1c74fc74943537f1b99cdf2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 Jan 2025 20:44:20 +0100 Subject: [PATCH 2/6] fix: WeakHandle was holding on to the Timeline allocation This made test_timeline_deletion_with_files_stuck_in_upload_queue fail because the RemoteTimelineClient was being kept alive. The fix is to stop keeping the timeline alive from WeakHandle. --- pageserver/src/page_service.rs | 6 +- .../src/tenant/remote_timeline_client.rs | 6 + pageserver/src/tenant/timeline.rs | 7 +- pageserver/src/tenant/timeline/handle.rs | 143 ++++++++++-------- 4 files changed, 93 insertions(+), 69 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 685eea8541..623972140a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -403,7 +403,7 @@ impl timeline::handle::ArcTimeline for Arc { Timeline::shard_timeline_id(self) } - fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState { + fn per_timeline_state(&self) -> &timeline::handle::PerTimelineState { &self.handles } @@ -941,7 +941,7 @@ impl PageServerHandler { assert_eq!(accum_pages.len(), max_batch_size.get()); return false; } - if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) { + if !accum_shard.is_same_handle_as(&this_shard) { trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch"); // TODO: we _could_ batch & execute each shard seperately (and in parallel). // But the current logic for keeping responses in order does not support that. @@ -979,7 +979,7 @@ impl PageServerHandler { assert_eq!(accum_requests.len(), max_batch_size.get()); return false; } - if !Arc::ptr_eq(accum_shard.timeline(), this_shard.timeline()) { + if !accum_shard.is_same_handle_as(&this_shard) { trace!("stopping batching because timeline object mismatch"); // TODO: we _could_ batch & execute each shard seperately (and in parallel). // But the current logic for keeping responses in order does not support that. diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 47c4a8637d..a006647785 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -382,6 +382,12 @@ pub(crate) struct RemoteTimelineClient { cancel: CancellationToken, } +impl Drop for RemoteTimelineClient { + fn drop(&mut self) { + debug!("dropping RemoteTimelineClient"); + } +} + impl RemoteTimelineClient { /// /// Create a remote storage client for given timeline diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index faece1416d..41e48726cf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -72,6 +72,7 @@ use std::{pin::pin, sync::OnceLock}; use crate::{ aux_file::AuxFileSizeEstimator, + page_service::TenantManagerTypes, tenant::{ config::AttachmentMode, layer_map::{LayerMap, SearchResult}, @@ -427,7 +428,7 @@ pub struct Timeline { pub(crate) l0_flush_global_state: L0FlushGlobalState, - pub(crate) handles: handle::PerTimelineState, + pub(crate) handles: handle::PerTimelineState, pub(crate) attach_wal_lag_cooldown: Arc>, @@ -4618,6 +4619,10 @@ impl Drop for Timeline { } } } + info!( + "Timeline {} for tenant {} is being dropped", + self.timeline_id, self.tenant_shard_id.tenant_id + ); } } diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 8f6bf69cfb..09e8582e64 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -104,11 +104,8 @@ //! When downgrading a `Handle` to a `WeakHandle`, we drop the `Arc`. //! Again, this is cheap because the `Arc` is private to the connection. //! -//! In addition to the GateGuard, we need to provide `Deref` impls -//! for `Handle` or `WeakHandle`. -//! For this, both `Handle` and `WeakHandle` need access to the `Arc`. -//! Upgrading a `WeakHandle` does not consume the `Handle`, so we can't move the -//! `Arc` out of `Handle` into `WeakHandle`. +//! In addition to the GateGuard, we need to provide `Deref` impl. +//! For this, both `Handle` need infallible access to an `Arc`. //! We could clone the `Arc` when upgrading a `WeakHandle`, but that would cause contention //! on the shared memory location that trakcs the refcount of the `Arc`. //! Instead, we wrap the `Arc` into another `Arc`. @@ -119,7 +116,7 @@ //! The attentive reader may have noticed the following reference cycle around the `Arc`: //! //! ```text -//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> WeakHandle --strong--> Timeline +//! Timeline --owns--> PerTimelineState --strong--> HandleInner --strong--> Timeline //! ``` //! //! Further, there is this cycle: @@ -155,15 +152,25 @@ //! `PerTimelineState::shutdown`, that extension of the cycle is bounded. //! //! Concurrently existing `WeakHandle`s will fail to `upgrade()`: -//! while they will succeed in upgradingtheir weak arc ref to a strong ref, +//! while they will succeed in upgrading `Weak>`, //! they will find the inner in state `HandleInner::ShutDown` state where the -//! `Arc` has already been dropped. +//! `Arc` and Timeline has already been dropped. //! //! Dropping the `Cache` undoes the registration of this `Cache`'s //! `HandleInner`s from all the `PerTimelineState`s, i.e., it //! removes the strong ref to each of its `HandleInner`s //! from all the `PerTimelineState`. //! +//! # Locking Rules +//! +//! To prevent deadlocks we: +//! +//! 1. Only ever hold one of the locks at a time. +//! 2. Don't add more than one Drop impl that locks on the +//! cycles above. +//! +//! As per (2), that impl is in `Drop for Cache`. +//! //! # Fast Path for Shard Routing //! //! The `Cache` has a fast path for shard routing to avoid calling into @@ -262,16 +269,16 @@ pub(crate) struct Handle { timeline: Arc, #[allow(dead_code)] // the field exists to keep the gate open gate_guard: Arc, - inner: Arc>, + inner: Arc>>, } pub(crate) struct WeakHandle { - timeline: Arc, - inner: Weak>, + inner: Weak>>, } -enum HandleInner { +enum HandleInner { KeepingTimelineGateOpen { #[allow(dead_code)] gate_guard: Arc, + timeline: Arc, }, ShutDown, } @@ -279,12 +286,13 @@ enum HandleInner { /// Embedded in each [`Types::Timeline`] as the anchor for the only long-lived strong ref to `HandleInner`. /// /// See module-level comment for details. -pub struct PerTimelineState { +pub struct PerTimelineState { // None = shutting down - handles: Mutex>>>>, + #[allow(clippy::type_complexity)] + handles: Mutex>>>>>, } -impl Default for PerTimelineState { +impl Default for PerTimelineState { fn default() -> Self { Self { handles: Mutex::new(Some(Default::default())), @@ -308,7 +316,7 @@ pub(crate) trait ArcTimeline: Clone { fn gate(&self) -> &utils::sync::gate::Gate; fn shard_timeline_id(&self) -> ShardTimelineId; fn get_shard_identity(&self) -> &ShardIdentity; - fn per_timeline_state(&self) -> &PerTimelineState; + fn per_timeline_state(&self) -> &PerTimelineState; } /// Errors returned by [`Cache::get`]. @@ -441,22 +449,30 @@ impl Cache { ShardSelector::Known(idx) => assert_eq!(idx, &key.shard_index), } - let gate_guard = match timeline.gate().enter() { - Ok(guard) => guard, - Err(_) => { - return Err(GetError::TimelineGateClosed); - } - }; - let gate_guard = Arc::new(gate_guard); trace!("creating new HandleInner"); - let handle_inner_arc = Arc::new(Mutex::new( - // TODO: global metric that keeps track of the number of live HandlerTimeline instances - // so we can identify reference cycle bugs. - HandleInner::KeepingTimelineGateOpen { - gate_guard: Arc::clone(&gate_guard), - }, - )); - let timeline = { + let handle_inner_arc = Arc::new(Mutex::new(HandleInner::KeepingTimelineGateOpen { + gate_guard: Arc::new( + // this enter() is expensive in production code because + // it hits the global Arc::gate refcounts + match timeline.gate().enter() { + Ok(guard) => guard, + Err(_) => { + return Err(GetError::TimelineGateClosed); + } + }, + ), + // this clone is expensive in production code because + // it hits the global Arc::clone refcounts + timeline: Arc::new(timeline.clone()), + })); + let handle_weak = WeakHandle { + inner: Arc::downgrade(&handle_inner_arc), + }; + let handle = handle_weak + .upgrade() + .ok() + .expect("we just created it and it's not linked anywhere yet"); + { let mut lock_guard = timeline .per_timeline_state() .handles @@ -476,12 +492,7 @@ impl Cache { unreachable!() } hash_map::Entry::Vacant(v) => { - let timeline = Arc::new(timeline.clone()); - v.insert(WeakHandle { - timeline: Arc::clone(&timeline), - inner: Arc::downgrade(&handle_inner_arc), - }); - timeline + v.insert(handle_weak); } } } @@ -489,12 +500,8 @@ impl Cache { return Err(GetError::PerTimelineStateShutDown); } } - }; - Ok(Handle { - timeline, - inner: handle_inner_arc, - gate_guard, - }) + } + Ok(handle) } Err(e) => Err(GetError::TenantManager(e)), } @@ -512,11 +519,15 @@ impl WeakHandle { }; let lock_guard = inner.lock().expect("poisoned"); match &*lock_guard { - HandleInner::KeepingTimelineGateOpen { gate_guard } => { + HandleInner::KeepingTimelineGateOpen { + timeline, + gate_guard, + } => { let gate_guard = Arc::clone(gate_guard); + let timeline = Arc::clone(timeline); drop(lock_guard); Ok(Handle { - timeline: Arc::clone(&self.timeline), + timeline, gate_guard, inner, }) @@ -524,8 +535,9 @@ impl WeakHandle { HandleInner::ShutDown => Err(HandleUpgradeError::ShutDown), } } - pub(crate) fn timeline(&self) -> &T::Timeline { - &self.timeline + + pub(crate) fn is_same_handle_as(&self, other: &WeakHandle) -> bool { + Weak::ptr_eq(&self.inner, &other.inner) } } @@ -539,13 +551,12 @@ impl std::ops::Deref for Handle { impl Handle { pub(crate) fn downgrade(&self) -> WeakHandle { WeakHandle { - timeline: Arc::clone(&self.timeline), inner: Arc::downgrade(&self.inner), } } } -impl PerTimelineState { +impl PerTimelineState { /// After this method returns, [`Cache::get`] will never again return a [`Handle`] /// to the [`Types::Timeline`] that embeds this per-timeline state. /// Even if [`TenantManager::resolve`] would still resolve to it. @@ -581,35 +592,37 @@ impl Drop for Cache { for ( _, WeakHandle { - timeline: handle_timeline, inner: handle_inner_weak, }, ) in self.map.drain() { - // handle is still being kept alive in PerTimelineState + let Some(handle_inner_arc) = handle_inner_weak.upgrade() else { + continue; + }; + let handle_timeline = handle_inner_arc + // locking rules: drop lock before acquiring other lock below + .lock() + .expect("poisoned") + .shutdown(); let per_timeline_state = handle_timeline.per_timeline_state(); - let mut handles = per_timeline_state.handles.lock().expect("mutex poisoned"); - let Some(handles) = &mut *handles else { + let mut handles_lock_guard = per_timeline_state.handles.lock().expect("mutex poisoned"); + let Some(handles) = &mut *handles_lock_guard else { continue; }; let Some(removed_handle_inner_arc) = handles.remove(&self.id) else { // There could have been a shutdown inbetween us upgrading the weak and locking the mutex. continue; }; - assert!(Weak::ptr_eq( - &Arc::downgrade(&removed_handle_inner_arc), - &handle_inner_weak - )); - let mut lock_guard = removed_handle_inner_arc.lock().expect("poisoned"); - lock_guard.shutdown(); + drop(handles_lock_guard); // locking rules: remember them when! + assert!(Arc::ptr_eq(&removed_handle_inner_arc, &handle_inner_arc,)); } } } -impl HandleInner { - fn shutdown(&mut self) { - match self { - HandleInner::KeepingTimelineGateOpen { .. } => *self = HandleInner::ShutDown, +impl HandleInner { + fn shutdown(&mut self) -> Arc { + match std::mem::replace(self, HandleInner::ShutDown) { + HandleInner::KeepingTimelineGateOpen { timeline, .. } => timeline, HandleInner::ShutDown => { unreachable!("handles are only shut down once in their lifetime"); } @@ -649,7 +662,7 @@ mod tests { gate: utils::sync::gate::Gate, id: TimelineId, shard: ShardIdentity, - per_timeline_state: PerTimelineState, + per_timeline_state: PerTimelineState, myself: Weak, } @@ -675,7 +688,7 @@ mod tests { &self.shard } - fn per_timeline_state(&self) -> &PerTimelineState { + fn per_timeline_state(&self) -> &PerTimelineState { &self.per_timeline_state } } From 9fe77c527f03b47fc4a995ce20b81454ce487964 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 Jan 2025 21:47:39 +0100 Subject: [PATCH 3/6] inline get_impl; https://github.com/neondatabase/neon/pull/10386#discussion_r1916939623 --- pageserver/src/tenant/timeline/handle.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 09e8582e64..2edd90e73c 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -348,17 +348,6 @@ impl Cache { timeline_id: TimelineId, shard_selector: ShardSelector, tenant_manager: &T::TenantManager, - ) -> Result, GetError> { - self.get_impl(timeline_id, shard_selector, tenant_manager) - .await - } - - #[instrument(level = "trace", skip_all)] - async fn get_impl( - &mut self, - timeline_id: TimelineId, - shard_selector: ShardSelector, - tenant_manager: &T::TenantManager, ) -> Result, GetError> { // terminates because when every iteration we remove an element from the map let miss: ShardSelector = loop { From 66c0df81090bb0f93e468b53cec485f2a4605554 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 Jan 2025 21:49:26 +0100 Subject: [PATCH 4/6] doc comment on BatchedFeMessage explaining WeakHandle; https://github.com/neondatabase/neon/pull/10386#discussion_r1916968951 --- pageserver/src/page_service.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 623972140a..9fe3b307b5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -560,6 +560,9 @@ struct BatchedTestRequest { timer: SmgrOpTimer, } +/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum, +/// so that we don't keep the [`Timeline::gate`] open while the batch +/// is being built up inside the [`spsc_fold`] (pagestream pipelining). enum BatchedFeMessage { Exists { span: Span, From c19a16792a78d37fc2e638f858da3ffd44b82491 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 16 Jan 2025 17:54:14 +0100 Subject: [PATCH 5/6] address nit ; https://github.com/neondatabase/neon/pull/10386#discussion_r1918782034 --- pageserver/src/tenant/timeline/handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 2edd90e73c..35d8c75ce1 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -384,7 +384,7 @@ impl Cache { return RoutingResult::NeedConsultTenantManager; }; let Ok(first_handle) = first_handle.upgrade() else { - // TODO: dedup with get_impl() + // TODO: dedup with get() trace!("handle cache stale"); let first_key_owned = *first_key; self.map.remove(&first_key_owned).unwrap(); From 0c3ab9c49457bbcc799cc45afae5c28d0b6cd60c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 16 Jan 2025 18:07:56 +0100 Subject: [PATCH 6/6] move test message tag to 99 and represent Fe message tag as enum, like we do for Be message --- libs/pageserver_api/src/models.rs | 162 +++++++++++++++++++----------- pgxn/neon/pagestore_client.h | 4 + 2 files changed, 107 insertions(+), 59 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 7e08b2825a..ae94f7b218 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1417,6 +1417,21 @@ pub enum PagestreamBeMessage { Test(PagestreamTestResponse), } +// Keep in sync with `pagestore_client.h` +#[repr(u8)] +enum PagestreamFeMessageTag { + Exists = 0, + Nblocks = 1, + GetPage = 2, + Error = 3, + DbSize = 4, + GetSlruSegment = 5, + /* future tags above this line */ + /// For testing purposes, not available in production. + #[cfg(feature = "testing")] + Test = 99, +} + // Keep in sync with `pagestore_client.h` #[repr(u8)] enum PagestreamBeMessageTag { @@ -1426,10 +1441,29 @@ enum PagestreamBeMessageTag { Error = 103, DbSize = 104, GetSlruSegment = 105, - /// Test message discrimimant is unstable + /* future tags above this line */ + /// For testing purposes, not available in production. #[cfg(feature = "testing")] - Test = 106, + Test = 199, } + +impl TryFrom for PagestreamFeMessageTag { + type Error = u8; + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(PagestreamFeMessageTag::Exists), + 1 => Ok(PagestreamFeMessageTag::Nblocks), + 2 => Ok(PagestreamFeMessageTag::GetPage), + 3 => Ok(PagestreamFeMessageTag::Error), + 4 => Ok(PagestreamFeMessageTag::DbSize), + 5 => Ok(PagestreamFeMessageTag::GetSlruSegment), + #[cfg(feature = "testing")] + 99 => Ok(PagestreamFeMessageTag::Test), + _ => Err(value), + } + } +} + impl TryFrom for PagestreamBeMessageTag { type Error = u8; fn try_from(value: u8) -> Result { @@ -1441,7 +1475,7 @@ impl TryFrom for PagestreamBeMessageTag { 104 => Ok(PagestreamBeMessageTag::DbSize), 105 => Ok(PagestreamBeMessageTag::GetSlruSegment), #[cfg(feature = "testing")] - 106 => Ok(PagestreamBeMessageTag::Test), + 199 => Ok(PagestreamBeMessageTag::Test), _ => Err(value), } } @@ -1592,7 +1626,7 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { - bytes.put_u8(0); + bytes.put_u8(PagestreamFeMessageTag::Exists as u8); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); @@ -1603,7 +1637,7 @@ impl PagestreamFeMessage { } Self::Nblocks(req) => { - bytes.put_u8(1); + bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); @@ -1614,7 +1648,7 @@ impl PagestreamFeMessage { } Self::GetPage(req) => { - bytes.put_u8(2); + bytes.put_u8(PagestreamFeMessageTag::GetPage as u8); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); @@ -1626,7 +1660,7 @@ impl PagestreamFeMessage { } Self::DbSize(req) => { - bytes.put_u8(3); + bytes.put_u8(PagestreamFeMessageTag::DbSize as u8); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); @@ -1634,7 +1668,7 @@ impl PagestreamFeMessage { } Self::GetSlruSegment(req) => { - bytes.put_u8(4); + bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); @@ -1643,7 +1677,7 @@ impl PagestreamFeMessage { } #[cfg(feature = "testing")] Self::Test(req) => { - bytes.put_u8(5); + bytes.put_u8(PagestreamFeMessageTag::Test as u8); bytes.put_u64(req.hdr.reqid); bytes.put_u64(req.hdr.request_lsn.0); bytes.put_u64(req.hdr.not_modified_since.0); @@ -1679,56 +1713,66 @@ impl PagestreamFeMessage { ), }; - match msg_tag { - 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - hdr: PagestreamRequest { - reqid, - request_lsn, - not_modified_since, - }, - rel: RelTag { - spcnode: body.read_u32::()?, + match PagestreamFeMessageTag::try_from(msg_tag) + .map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? + { + PagestreamFeMessageTag::Exists => { + Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { + hdr: PagestreamRequest { + reqid, + request_lsn, + not_modified_since, + }, + rel: RelTag { + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, + }, + })) + } + PagestreamFeMessageTag::Nblocks => { + Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { + hdr: PagestreamRequest { + reqid, + request_lsn, + not_modified_since, + }, + rel: RelTag { + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, + }, + })) + } + PagestreamFeMessageTag::GetPage => { + Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + hdr: PagestreamRequest { + reqid, + request_lsn, + not_modified_since, + }, + rel: RelTag { + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, + }, + blkno: body.read_u32::()?, + })) + } + PagestreamFeMessageTag::DbSize => { + Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + hdr: PagestreamRequest { + reqid, + request_lsn, + not_modified_since, + }, dbnode: body.read_u32::()?, - relnode: body.read_u32::()?, - forknum: body.read_u8()?, - }, - })), - 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - hdr: PagestreamRequest { - reqid, - request_lsn, - not_modified_since, - }, - rel: RelTag { - spcnode: body.read_u32::()?, - dbnode: body.read_u32::()?, - relnode: body.read_u32::()?, - forknum: body.read_u8()?, - }, - })), - 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - hdr: PagestreamRequest { - reqid, - request_lsn, - not_modified_since, - }, - rel: RelTag { - spcnode: body.read_u32::()?, - dbnode: body.read_u32::()?, - relnode: body.read_u32::()?, - forknum: body.read_u8()?, - }, - blkno: body.read_u32::()?, - })), - 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - hdr: PagestreamRequest { - reqid, - request_lsn, - not_modified_since, - }, - dbnode: body.read_u32::()?, - })), - 4 => Ok(PagestreamFeMessage::GetSlruSegment( + })) + } + PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment( PagestreamGetSlruSegmentRequest { hdr: PagestreamRequest { reqid, @@ -1740,7 +1784,7 @@ impl PagestreamFeMessage { }, )), #[cfg(feature = "testing")] - 5 => Ok(PagestreamFeMessage::Test(PagestreamTestRequest { + PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest { hdr: PagestreamRequest { reqid, request_lsn, diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 37bc4f7886..ba5a1c6ce5 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -34,6 +34,8 @@ typedef enum T_NeonGetPageRequest, T_NeonDbSizeRequest, T_NeonGetSlruSegmentRequest, + /* future tags above this line */ + T_NeonTestRequest = 99, /* only in cfg(feature = "testing") */ /* pagestore -> pagestore_client */ T_NeonExistsResponse = 100, @@ -42,6 +44,8 @@ typedef enum T_NeonErrorResponse, T_NeonDbSizeResponse, T_NeonGetSlruSegmentResponse, + /* future tags above this line */ + T_NeonTestResponse = 199, /* only in cfg(feature = "testing") */ } NeonMessageTag; typedef uint64 NeonRequestId;