From 93e6046005e8ab63f7fa204eab2853e6e9426585 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 10 Feb 2024 21:54:41 +0200 Subject: [PATCH] Send LSN range in getpage request --- libs/pageserver_api/src/models.rs | 99 +++++++++----- .../pagebench/src/cmd/getpage_latest_lsn.rs | 6 +- pageserver/src/basebackup.rs | 5 +- pageserver/src/page_service.rs | 107 ++++++++------- pageserver/src/pgdatadir_mapping.rs | 28 ++-- pageserver/src/walingest.rs | 128 +++++++++++------- pgxn/neon/pagestore_client.h | 5 +- pgxn/neon/pagestore_smgr.c | 52 ++++--- trace/src/main.rs | 21 ++- 9 files changed, 271 insertions(+), 180 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index b4909f247f..2d971a9b96 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -798,9 +798,10 @@ pub mod virtual_file { pub enum PagestreamFeMessage { Exists(PagestreamExistsRequest), Nblocks(PagestreamNblocksRequest), - GetPage(PagestreamGetPageRequest), + GetLatestPage(PagestreamGetLatestPageRequest), // for compatinility with old clients DbSize(PagestreamDbSizeRequest), GetSlruSegment(PagestreamGetSlruSegmentRequest), + GetPage(PagestreamGetPageRequest), } // Wrapped in libpq CopyData @@ -841,36 +842,44 @@ impl TryFrom for PagestreamBeMessageTag { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub rel: RelTag, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamNblocksRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub rel: RelTag, } #[derive(Debug, PartialEq, Eq)] -pub struct PagestreamGetPageRequest { +pub struct PagestreamGetLatestPageRequest { pub latest: bool, pub lsn: Lsn, pub rel: RelTag, pub blkno: u32, } +#[derive(Debug, PartialEq, Eq)] +pub struct PagestreamGetPageRequest { + pub horizon: Lsn, + pub lsn: Lsn, + pub rel: RelTag, + pub blkno: u32, +} + #[derive(Debug, PartialEq, Eq)] pub struct PagestreamDbSizeRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub dbnode: u32, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetSlruSegmentRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub kind: u8, pub segno: u32, @@ -924,7 +933,7 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { bytes.put_u8(0); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); @@ -934,7 +943,7 @@ impl PagestreamFeMessage { Self::Nblocks(req) => { bytes.put_u8(1); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); @@ -942,7 +951,7 @@ impl PagestreamFeMessage { bytes.put_u8(req.rel.forknum); } - Self::GetPage(req) => { + Self::GetLatestPage(req) => { bytes.put_u8(2); bytes.put_u8(u8::from(req.latest)); bytes.put_u64(req.lsn.0); @@ -955,18 +964,29 @@ impl PagestreamFeMessage { Self::DbSize(req) => { bytes.put_u8(3); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.dbnode); } Self::GetSlruSegment(req) => { bytes.put_u8(4); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u8(req.kind); bytes.put_u32(req.segno); } + + Self::GetPage(req) => { + bytes.put_u8(5); + bytes.put_u64(req.horizon.0); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.rel.spcnode); + bytes.put_u32(req.rel.dbnode); + bytes.put_u32(req.rel.relnode); + bytes.put_u8(req.rel.forknum); + bytes.put_u32(req.blkno); + } } bytes.into() @@ -982,7 +1002,7 @@ impl PagestreamFeMessage { let msg_tag = body.read_u8()?; match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: body.read_u8()? != 0, + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, @@ -992,7 +1012,7 @@ impl PagestreamFeMessage { }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: body.read_u8()? != 0, + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, @@ -1001,8 +1021,34 @@ impl PagestreamFeMessage { forknum: body.read_u8()?, }, })), - 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: body.read_u8()? != 0, + 2 => Ok(PagestreamFeMessage::GetLatestPage( + PagestreamGetLatestPageRequest { + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + rel: RelTag { + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, + }, + blkno: body.read_u32::()?, + }, + )), + 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + horizon: Lsn::from(body.read_u64::()?), + lsn: Lsn::from(body.read_u64::()?), + dbnode: body.read_u32::()?, + })), + 4 => Ok(PagestreamFeMessage::GetSlruSegment( + PagestreamGetSlruSegmentRequest { + horizon: Lsn::from(body.read_u64::()?), + lsn: Lsn::from(body.read_u64::()?), + kind: body.read_u8()?, + segno: body.read_u32::()?, + }, + )), + 5 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, @@ -1012,19 +1058,6 @@ impl PagestreamFeMessage { }, blkno: body.read_u32::()?, })), - 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: body.read_u8()? != 0, - lsn: Lsn::from(body.read_u64::()?), - dbnode: body.read_u32::()?, - })), - 4 => Ok(PagestreamFeMessage::GetSlruSegment( - PagestreamGetSlruSegmentRequest { - latest: body.read_u8()? != 0, - lsn: Lsn::from(body.read_u64::()?), - kind: body.read_u8()?, - segno: body.read_u32::()?, - }, - )), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } @@ -1148,7 +1181,7 @@ mod tests { // Test serialization/deserialization of PagestreamFeMessage let messages = vec![ PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: true, + horizon: Lsn::MAX, lsn: Lsn(4), rel: RelTag { forknum: 1, @@ -1158,7 +1191,7 @@ mod tests { }, }), PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: false, + horizon: Lsn::INVALID, lsn: Lsn(4), rel: RelTag { forknum: 1, @@ -1168,8 +1201,8 @@ mod tests { }, }), PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: true, - lsn: Lsn(4), + horizon: Lsn::MAX, + lsn: Lsn::INVALID, rel: RelTag { forknum: 1, spcnode: 2, @@ -1179,7 +1212,7 @@ mod tests { blkno: 7, }), PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: true, + horizon: Lsn::MAX, lsn: Lsn(4), dbnode: 7, }), diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index c3d8e61a2c..7431dcf883 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -312,7 +312,11 @@ async fn main_impl( let (rel_tag, block_no) = key_to_rel_block(key).expect("we filter non-rel-block keys out above"); PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), + horizon: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, lsn: r.timeline_lsn, rel: rel_tag, blkno: block_no, diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 0479d05f8f..756db72bf2 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -361,9 +361,10 @@ where /// Add contents of relfilenode `src`, naming it as `dst`. async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> { + let horizon = self.lsn; // we do not need latest version let nblocks = self .timeline - .get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx) + .get_rel_size(src, Version::Lsn(self.lsn), horizon, self.ctx) .await?; // If the relation is empty, create an empty file @@ -384,7 +385,7 @@ where for blknum in startblk..endblk { let img = self .timeline - .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx) + .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), horizon, self.ctx) .await?; segment_data.extend_from_slice(&img[..]); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3b9a30ba4c..3d86360a20 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -637,6 +637,25 @@ impl PageServerHandler { span, ) } + PagestreamFeMessage::GetLatestPage(old_req) => { + let req = PagestreamGetPageRequest { + horizon: if old_req.latest { + Lsn::MAX + } else { + old_req.lsn + }, + lsn: old_req.lsn, + rel: old_req.rel, + blkno: old_req.blkno, + }; + let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); + ( + self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) + } PagestreamFeMessage::GetPage(req) => { // shard_id is filled in by the handler let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); @@ -849,67 +868,47 @@ impl PageServerHandler { /// the LSN that should be used to look up the page versions. async fn wait_or_get_last_lsn( timeline: &Timeline, - mut lsn: Lsn, - latest: bool, + lsn: Lsn, + horizon: Lsn, latest_gc_cutoff_lsn: &RcuReadGuard, ctx: &RequestContext, ) -> Result { - if latest { - // Latest page version was requested. If LSN is given, it is a hint - // to the page server that there have been no modifications to the - // page after that LSN. If we haven't received WAL up to that point, - // wait until it arrives. - let last_record_lsn = timeline.get_last_record_lsn(); - - // Note: this covers the special case that lsn == Lsn(0). That - // special case means "return the latest version whatever it is", - // and it's used for bootstrapping purposes, when the page server is - // connected directly to the compute node. That is needed because - // when you connect to the compute node, to receive the WAL, the - // walsender process will do a look up in the pg_authid catalog - // table for authentication. That poses a deadlock problem: the - // catalog table lookup will send a GetPage request, but the GetPage - // request will block in the page server because the recent WAL - // hasn't been received yet, and it cannot be received until the - // walsender completes the authentication and starts streaming the - // WAL. - if lsn <= last_record_lsn { - lsn = last_record_lsn; - } else { - timeline - .wait_lsn( - lsn, - crate::tenant::timeline::WaitLsnWaiter::PageService, - ctx, - ) - .await?; - // Since we waited for 'lsn' to arrive, that is now the last - // record LSN. (Or close enough for our purposes; the - // last-record LSN can advance immediately after we return - // anyway) - } + let last_record_lsn = timeline.get_last_record_lsn(); + // Horizon = 0 (INVALID) is treated as LSN interval degenerated to point [lsn,lsn]. + // It as done mostly for convenience (because such get_page commands are widely used in tests) and + // also seems to be logical: Lsn::MAX moves upper boundary of LSN interval till last_record_lsn and + // Lsn(0) moves upper boundary to lower boundary. + let request_horizon = if horizon == Lsn::INVALID { + lsn } else { - if lsn == Lsn(0) { - return Err(PageStreamError::BadRequest( - "invalid LSN(0) in request".into(), - )); - } + horizon + }; + let effective_lsn = Lsn::max(lsn, Lsn::min(request_horizon, last_record_lsn)); + if effective_lsn > last_record_lsn { timeline .wait_lsn( - lsn, + effective_lsn, crate::tenant::timeline::WaitLsnWaiter::PageService, ctx, ) .await?; + // Since we waited for 'lsn' to arrive, that is now the last + // record LSN. (Or close enough for our purposes; the + // last-record LSN can advance immediately after we return + // anyway) + } else if effective_lsn == Lsn(0) { + return Err(PageStreamError::BadRequest( + "invalid LSN(0) in request".into(), + )); } - if lsn < **latest_gc_cutoff_lsn { + if effective_lsn < **latest_gc_cutoff_lsn { return Err(PageStreamError::BadRequest(format!( "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", - lsn, **latest_gc_cutoff_lsn + effective_lsn, **latest_gc_cutoff_lsn ).into())); } - Ok(lsn) + Ok(effective_lsn) } #[instrument(skip_all, fields(shard_id))] @@ -927,11 +926,11 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let exists = timeline - .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx) + .get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx) .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { @@ -955,11 +954,11 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let n_blocks = timeline - .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx) + .get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx) .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { @@ -983,7 +982,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let total_blocks = timeline @@ -991,7 +990,7 @@ impl PageServerHandler { DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), - req.latest, + req.horizon, ctx, ) .await?; @@ -1161,11 +1160,11 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let page = timeline - .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) + .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx) .await?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { @@ -1189,7 +1188,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let kind = SlruKind::from_repr(req.kind) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 6f7d74bdee..ef7c3bbdff 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -175,7 +175,7 @@ impl Timeline { tag: RelTag, blknum: BlockNumber, version: Version<'_>, - latest: bool, + horizon: Lsn, ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { @@ -184,7 +184,7 @@ impl Timeline { )); } - let nblocks = self.get_rel_size(tag, version, latest, ctx).await?; + let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?; if blknum >= nblocks { debug!( "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", @@ -206,7 +206,7 @@ impl Timeline { spcnode: Oid, dbnode: Oid, version: Version<'_>, - latest: bool, + horizon: Lsn, ctx: &RequestContext, ) -> Result { let mut total_blocks = 0; @@ -214,7 +214,7 @@ impl Timeline { let rels = self.list_rels(spcnode, dbnode, version, ctx).await?; for rel in rels { - let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?; + let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?; total_blocks += n_blocks as usize; } Ok(total_blocks) @@ -225,7 +225,7 @@ impl Timeline { &self, tag: RelTag, version: Version<'_>, - latest: bool, + horizon: Lsn, ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { @@ -239,7 +239,7 @@ impl Timeline { } if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) - && !self.get_rel_exists(tag, version, latest, ctx).await? + && !self.get_rel_exists(tag, version, horizon, ctx).await? { // FIXME: Postgres sometimes calls smgrcreate() to create // FSM, and smgrnblocks() on it immediately afterwards, @@ -252,14 +252,8 @@ impl Timeline { let mut buf = version.get(self, key, ctx).await?; let nblocks = buf.get_u32_le(); - if latest { - // Update relation size cache only if "latest" flag is set. - // This flag is set by compute when it is working with most recent version of relation. - // Typically master compute node always set latest=true. - // Please notice, that even if compute node "by mistake" specifies old LSN but set - // latest=true, then it can not cause cache corruption, because with latest=true - // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be - // associated with most recent value of LSN. + if horizon == Lsn::MAX { + // Update relation size cache only if latest version is requested. self.update_cached_rel_size(tag, version.get_lsn(), nblocks); } Ok(nblocks) @@ -270,7 +264,7 @@ impl Timeline { &self, tag: RelTag, version: Version<'_>, - _latest: bool, + _horizon: Lsn, ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { @@ -1088,7 +1082,7 @@ impl<'a> DatadirModification<'a> { ) -> anyhow::Result<()> { let total_blocks = self .tline - .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx) + .get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx) .await?; // Remove entry from dbdir @@ -1187,7 +1181,7 @@ impl<'a> DatadirModification<'a> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); if self .tline - .get_rel_exists(rel, Version::Modified(self), true, ctx) + .get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx) .await? { let size_key = rel_size_to_key(rel); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 9c7e8748d5..51ef703446 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1034,7 +1034,7 @@ impl WalIngest { let nblocks = modification .tline - .get_rel_size(src_rel, Version::Modified(modification), true, ctx) + .get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx) .await?; let dst_rel = RelTag { spcnode: tablespace_id, @@ -1072,7 +1072,7 @@ impl WalIngest { src_rel, blknum, Version::Modified(modification), - true, + Lsn::MAX, ctx, ) .await?; @@ -1242,7 +1242,7 @@ impl WalIngest { }; if modification .tline - .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? { self.put_rel_drop(modification, rel, ctx).await?; @@ -1541,7 +1541,7 @@ impl WalIngest { nblocks } else if !modification .tline - .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? { // create it with 0 size initially, the logic below will extend it @@ -1553,7 +1553,7 @@ impl WalIngest { } else { modification .tline - .get_rel_size(rel, Version::Modified(modification), true, ctx) + .get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? }; @@ -1650,14 +1650,14 @@ async fn get_relsize( ) -> anyhow::Result { let nblocks = if !modification .tline - .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? { 0 } else { modification .tline - .get_rel_size(rel, Version::Modified(modification), true, ctx) + .get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? }; Ok(nblocks) @@ -1732,29 +1732,29 @@ mod tests { // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx) .await?, false ); assert!(tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx) .await .is_err()); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, 1 ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, 3 ); @@ -1762,46 +1762,46 @@ mod tests { // Check page contents at each LSN assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 0 at 2") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 1 at 4") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 1 at 4") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 2 at 5") ); @@ -1817,19 +1817,19 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx) .await?, 2 ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 0 at 3") ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 1 at 4") ); @@ -1837,13 +1837,13 @@ mod tests { // should still see the truncated block with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, 3 ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 2 at 5") ); @@ -1856,7 +1856,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), Lsn::INVALID, &ctx) .await?, 0 ); @@ -1869,19 +1869,19 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx) .await?, 2 ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx) .await?, ZERO_PAGE ); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 1") ); @@ -1894,21 +1894,27 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx) .await?, 1501 ); for blk in 2..1500 { assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx) + .get_rel_page_at_lsn( + TESTREL_A, + blk, + Version::Lsn(Lsn(0x80)), + Lsn::INVALID, + &ctx + ) .await?, ZERO_PAGE ); } assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx) .await?, test_img("foo blk 1500") ); @@ -1935,13 +1941,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, 1 ); @@ -1954,7 +1960,7 @@ mod tests { // Check that rel is not visible anymore assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx) .await?, false ); @@ -1972,13 +1978,13 @@ mod tests { // Check that rel exists and size is correct assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx) .await?, 1 ); @@ -2011,24 +2017,24 @@ mod tests { // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx) .await?, false ); assert!(tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx) .await .is_err()); assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx) .await?, relsize ); @@ -2039,7 +2045,7 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx) + .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), Lsn::INVALID, &ctx) .await?, test_img(&data) ); @@ -2056,7 +2062,7 @@ mod tests { // Check reported size and contents after truncation assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx) .await?, 1 ); @@ -2066,7 +2072,13 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx) + .get_rel_page_at_lsn( + TESTREL_A, + blkno, + Version::Lsn(Lsn(0x60)), + Lsn::INVALID, + &ctx + ) .await?, test_img(&data) ); @@ -2075,7 +2087,7 @@ mod tests { // should still see all blocks with older LSN assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx) .await?, relsize ); @@ -2084,7 +2096,13 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx) + .get_rel_page_at_lsn( + TESTREL_A, + blkno, + Version::Lsn(Lsn(0x50)), + Lsn::INVALID, + &ctx + ) .await?, test_img(&data) ); @@ -2104,13 +2122,13 @@ mod tests { assert_eq!( tline - .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx) + .get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx) .await?, true ); assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx) .await?, relsize ); @@ -2120,7 +2138,13 @@ mod tests { let data = format!("foo blk {} at {}", blkno, lsn); assert_eq!( tline - .get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx) + .get_rel_page_at_lsn( + TESTREL_A, + blkno, + Version::Lsn(Lsn(0x80)), + Lsn::INVALID, + &ctx + ) .await?, test_img(&data) ); @@ -2154,7 +2178,7 @@ mod tests { assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx) .await?, RELSEG_SIZE + 1 ); @@ -2168,7 +2192,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx) .await?, RELSEG_SIZE ); @@ -2183,7 +2207,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx) .await?, RELSEG_SIZE - 1 ); @@ -2201,7 +2225,7 @@ mod tests { m.commit(&ctx).await?; assert_eq!( tline - .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx) + .get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx) .await?, size as BlockNumber ); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 44ae766f76..35f5d73531 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -33,9 +33,10 @@ typedef enum /* pagestore_client -> pagestore */ T_NeonExistsRequest = 0, T_NeonNblocksRequest, - T_NeonGetPageRequest, + T_NeonGetLatestPageRequest, /* old format of get_page command */ T_NeonDbSizeRequest, T_NeonGetSlruSegmentRequest, + T_NeonGetPageRequest, /* pagestore -> pagestore_client */ T_NeonExistsResponse = 100, @@ -79,7 +80,7 @@ typedef enum { typedef struct { NeonMessageTag tag; - bool latest; /* if true, request latest page version */ + XLogRecPtr horizon; /* uppe boundary for page LSN */ XLogRecPtr lsn; /* request page version @ this LSN */ } NeonRequest; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 57a16e00ca..8b32224796 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -110,6 +110,20 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id); static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; +#define MAX_LSN ((XLogRecPtr)~0) + +/* + * There are three kinds of get_page : + * 1. Master compute: get the latest page not older than specified LSN (horizon=Lsn::MAX) + * 2. RO replica: get the latest page not newer than current WAL position replica already applied (horizon=GetXLogReplayRecPtr(NULL)) + * 3. Snapshot: get latest page not new than specified LSN (horizon=request_lsn) + */ +static XLogRecPtr +neon_get_horizon(bool latest) +{ + return latest ? MAX_LSN : RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : InvalidXLogRecPtr; /* horizon=InvalidXlogRecPtr is replaced with request_lsn at PS */ +} + /* * Prefetch implementation: * @@ -687,9 +701,10 @@ static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn) { bool found; + bool latest; NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, - .req.latest = false, + .req.horizon = 0, .req.lsn = 0, .rinfo = BufTagGetNRelFileInfo(slot->buftag), .forknum = slot->buftag.forkNum, @@ -699,13 +714,13 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force if (force_lsn && force_latest) { request.req.lsn = *force_lsn; - request.req.latest = *force_latest; + latest = *force_latest; slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn; } else { XLogRecPtr lsn = neon_get_request_lsn( - &request.req.latest, + &latest, BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum @@ -733,6 +748,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force prefetch_lsn = Max(prefetch_lsn, lsn); slot->effective_request_lsn = prefetch_lsn; } + request.req.horizon = neon_get_horizon(latest); Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); @@ -1006,7 +1022,7 @@ nm_pack_request(NeonRequest *msg) { NeonExistsRequest *msg_req = (NeonExistsRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); @@ -1019,7 +1035,7 @@ nm_pack_request(NeonRequest *msg) { NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); @@ -1032,7 +1048,7 @@ nm_pack_request(NeonRequest *msg) { NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, msg_req->dbNode); @@ -1042,7 +1058,7 @@ nm_pack_request(NeonRequest *msg) { NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); @@ -1057,7 +1073,7 @@ nm_pack_request(NeonRequest *msg) { NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendbyte(&s, msg_req->kind); pq_sendint32(&s, msg_req->segno); @@ -1209,7 +1225,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1222,7 +1238,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1236,7 +1252,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1247,7 +1263,7 @@ nm_to_string(NeonMessage *msg) appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\""); appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1259,7 +1275,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"kind\": %u", msg_req->kind); appendStringInfo(&s, ", \"segno\": %u", msg_req->segno); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1664,7 +1680,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) { NeonExistsRequest request = { .req.tag = T_NeonExistsRequest, - .req.latest = latest, + .req.horizon = neon_get_horizon(latest), .req.lsn = request_lsn, .rinfo = InfoFromSMgrRel(reln), .forknum = forkNum}; @@ -2474,7 +2490,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) { NeonNblocksRequest request = { .req.tag = T_NeonNblocksRequest, - .req.latest = latest, + .req.horizon = neon_get_horizon(latest), .req.lsn = request_lsn, .rinfo = InfoFromSMgrRel(reln), .forknum = forknum, @@ -2531,7 +2547,7 @@ neon_dbsize(Oid dbNode) { NeonDbSizeRequest request = { .req.tag = T_NeonDbSizeRequest, - .req.latest = latest, + .req.horizon = neon_get_horizon(latest), .req.lsn = request_lsn, .dbNode = dbNode, }; @@ -2827,7 +2843,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf NeonResponse *resp; NeonGetSlruSegmentRequest request = { .req.tag = T_NeonGetSlruSegmentRequest, - .req.latest = false, + .req.horizon = InvalidXLogRecPtr, .req.lsn = request_lsn, .kind = kind, @@ -2980,7 +2996,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, NeonNblocksRequest request = { .req = (NeonRequest) { .lsn = end_recptr, - .latest = false, + .horizon = neon_get_horizon(false), .tag = T_NeonNblocksRequest, }, .rinfo = rinfo, diff --git a/trace/src/main.rs b/trace/src/main.rs index 4605c124e9..3a57af57fa 100644 --- a/trace/src/main.rs +++ b/trace/src/main.rs @@ -7,7 +7,9 @@ use std::{ io::BufReader, }; -use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest}; +use pageserver_api::models::{ + PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest, +}; use utils::id::{ConnectionId, TenantId, TimelineId}; use clap::{Parser, Subcommand}; @@ -51,9 +53,11 @@ enum Command { // - detect any prefetching anomalies by looking for negative deltas during seqscan fn analyze_trace(mut reader: R) { let mut total = 0; // Total requests traced + let mut old = 0; // Old requests traced let mut cross_rel = 0; // Requests that ask for different rel than previous request let mut deltas = HashMap::::new(); // Consecutive blkno differences let mut prev: Option = None; + let mut old_prev: Option = None; // Compute stats while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) { @@ -61,6 +65,20 @@ fn analyze_trace(mut reader: R) { PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Nblocks(_) => {} PagestreamFeMessage::GetSlruSegment(_) => {} + PagestreamFeMessage::GetLatestPage(req) => { + total += 1; + old += 1; + + if let Some(prev) = old_prev { + if prev.rel == req.rel { + let delta = (req.blkno as i32) - (prev.blkno as i32); + deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1); + } else { + cross_rel += 1; + } + } + old_prev = Some(req); + } PagestreamFeMessage::GetPage(req) => { total += 1; @@ -83,6 +101,7 @@ fn analyze_trace(mut reader: R) { deltas.retain(|_, count| *count > 300); other -= deltas.len(); dbg!(total); + dbg!(old); dbg!(cross_rel); dbg!(other); dbg!(deltas);