From 9d984edf54f79e1d587a4ca5ae8eefaafd25f5d6 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 23 Jan 2024 10:14:40 +0200 Subject: [PATCH] Replace latest with horizon in get_page request --- libs/pageserver_api/src/models.rs | 32 ++++----- libs/safekeeper_api/src/models.rs | 2 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 12 +++- pageserver/src/basebackup.rs | 5 +- pageserver/src/page_service.rs | 65 +++++++------------ pageserver/src/pgdatadir_mapping.rs | 28 ++++---- pageserver/src/tenant/timeline.rs | 10 +-- .../walreceiver/connection_manager.rs | 6 +- pageserver/src/walingest.rs | 14 ++-- pgxn/neon/pagestore_client.h | 2 +- pgxn/neon/pagestore_smgr.c | 40 +++++++----- safekeeper/src/http/routes.rs | 2 +- safekeeper/src/state.rs | 4 +- safekeeper/src/timeline.rs | 4 +- storage_broker/benches/rps.rs | 2 +- storage_broker/proto/broker.proto | 2 +- storage_broker/src/bin/storage_broker.rs | 2 +- 17 files changed, 112 insertions(+), 120 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 5a638df9cc..c7f9b6d8ba 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -697,21 +697,21 @@ impl TryFrom for PagestreamBeMessageTag { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub rel: RelTag, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamNblocksRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub rel: RelTag, } #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub rel: RelTag, pub blkno: u32, @@ -719,7 +719,7 @@ pub struct PagestreamGetPageRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamDbSizeRequest { - pub latest: bool, + pub horizon: Lsn, pub lsn: Lsn, pub dbnode: u32, } @@ -780,7 +780,7 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { bytes.put_u8(0); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); @@ -790,7 +790,7 @@ impl PagestreamFeMessage { Self::Nblocks(req) => { bytes.put_u8(1); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); @@ -800,7 +800,7 @@ impl PagestreamFeMessage { Self::GetPage(req) => { bytes.put_u8(2); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); bytes.put_u32(req.rel.dbnode); @@ -811,7 +811,7 @@ impl PagestreamFeMessage { Self::DbSize(req) => { bytes.put_u8(3); - bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.dbnode); } @@ -838,7 +838,7 @@ impl PagestreamFeMessage { let msg_tag = body.read_u8()?; match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: body.read_u8()? != 0, + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, @@ -848,7 +848,7 @@ impl PagestreamFeMessage { }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: body.read_u8()? != 0, + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, @@ -858,7 +858,7 @@ impl PagestreamFeMessage { }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: body.read_u8()? != 0, + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { spcnode: body.read_u32::()?, @@ -869,7 +869,7 @@ impl PagestreamFeMessage { blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: body.read_u8()? != 0, + horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), dbnode: body.read_u32::()?, })), @@ -1005,7 +1005,7 @@ mod tests { // Test serialization/deserialization of PagestreamFeMessage let messages = vec![ PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: true, + horizon: Lsn.MAX, lsn: Lsn(4), rel: RelTag { forknum: 1, @@ -1015,7 +1015,7 @@ mod tests { }, }), PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: false, + horizon: Lsn(4), lsn: Lsn(4), rel: RelTag { forknum: 1, @@ -1025,7 +1025,7 @@ mod tests { }, }), PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: true, + latest: Lsn.MAX, lsn: Lsn(4), rel: RelTag { forknum: 1, @@ -1036,7 +1036,7 @@ mod tests { blkno: 7, }), PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: true, + latest: Lsn::MAX, lsn: Lsn(4), dbnode: 7, }), diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 17f78e65d5..2fbc333075 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -52,7 +52,7 @@ pub struct SkTimelineInfo { pub http_connstr: Option, // Minimum of all active RO replicas flush LSN #[serde(default = "lsn_invalid")] - pub standby_flush_lsn: Lsn, + pub standby_horizon: Lsn, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index aa809d8d26..2606458fbc 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -288,7 +288,11 @@ async fn main_impl( num_client: rng.gen_range(0..args.num_clients.get()), }, PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), + horizon: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, lsn: r.timeline_lsn, rel: rel_tag, blkno: block_no, @@ -335,7 +339,11 @@ async fn main_impl( let (rel_tag, block_no) = key_to_rel_block(key) .expect("we filter non-rel-block keys out above"); PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), + horizon: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, lsn: r.timeline_lsn, rel: rel_tag, blkno: block_no, diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 7edfab75d4..583c69d773 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -357,9 +357,10 @@ where /// Add contents of relfilenode `src`, naming it as `dst`. async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> { + let horizon = self.lsn; // we do not need latest version let nblocks = self .timeline - .get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx) + .get_rel_size(src, Version::Lsn(self.lsn), horizon, self.ctx) .await?; // If the relation is empty, create an empty file @@ -380,7 +381,7 @@ where for blknum in startblk..endblk { let img = self .timeline - .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx) + .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), horizon, self.ctx) .await?; segment_data.extend_from_slice(&img[..]); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6fc38a76d4..b75f332a15 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -842,55 +842,34 @@ impl PageServerHandler { /// the LSN that should be used to look up the page versions. async fn wait_or_get_last_lsn( timeline: &Timeline, - mut lsn: Lsn, - latest: bool, + lsn: Lsn, + horizon: Lsn, latest_gc_cutoff_lsn: &RcuReadGuard, ctx: &RequestContext, ) -> Result { - if latest { - // Latest page version was requested. If LSN is given, it is a hint - // to the page server that there have been no modifications to the - // page after that LSN. If we haven't received WAL up to that point, - // wait until it arrives. - let last_record_lsn = timeline.get_last_record_lsn(); - - // Note: this covers the special case that lsn == Lsn(0). That - // special case means "return the latest version whatever it is", - // and it's used for bootstrapping purposes, when the page server is - // connected directly to the compute node. That is needed because - // when you connect to the compute node, to receive the WAL, the - // walsender process will do a look up in the pg_authid catalog - // table for authentication. That poses a deadlock problem: the - // catalog table lookup will send a GetPage request, but the GetPage - // request will block in the page server because the recent WAL - // hasn't been received yet, and it cannot be received until the - // walsender completes the authentication and starts streaming the - // WAL. - if lsn <= last_record_lsn { - lsn = last_record_lsn; - } else { - timeline.wait_lsn(lsn, ctx).await?; - // Since we waited for 'lsn' to arrive, that is now the last - // record LSN. (Or close enough for our purposes; the - // last-record LSN can advance immediately after we return - // anyway) - } + let last_record_lsn = timeline.get_last_record_lsn(); + let effective_lsn = Lsn::max(lsn, Lsn::min(horizon, last_record_lsn)); + if effective_lsn > last_record_lsn { + timeline.wait_lsn(effective_lsn, ctx).await?; + // Since we waited for 'lsn' to arrive, that is now the last + // record LSN. (Or close enough for our purposes; the + // last-record LSN can advance immediately after we return + // anyway) } else { - if lsn == Lsn(0) { + if effective_lsn == Lsn(0) { return Err(PageStreamError::BadRequest( "invalid LSN(0) in request".into(), )); } - timeline.wait_lsn(lsn, ctx).await?; } - if lsn < **latest_gc_cutoff_lsn { + if effective_lsn < **latest_gc_cutoff_lsn { return Err(PageStreamError::BadRequest(format!( "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", - lsn, **latest_gc_cutoff_lsn + effective_lsn, **latest_gc_cutoff_lsn ).into())); } - Ok(lsn) + Ok(effective_lsn) } #[instrument(skip_all, fields(shard_id))] @@ -908,11 +887,11 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let exists = timeline - .get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx) + .get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx) .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { @@ -936,11 +915,11 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let n_blocks = timeline - .get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx) + .get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx) .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { @@ -964,7 +943,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let total_blocks = timeline @@ -972,7 +951,7 @@ impl PageServerHandler { DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), - req.latest, + req.horizon, ctx, ) .await?; @@ -1142,11 +1121,11 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = - Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx) .await?; let page = timeline - .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) + .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx) .await?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f1d18c0146..c5291e4fd1 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -169,7 +169,7 @@ impl Timeline { tag: RelTag, blknum: BlockNumber, version: Version<'_>, - latest: bool, + horizon: Lsn, ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { @@ -178,7 +178,7 @@ impl Timeline { )); } - let nblocks = self.get_rel_size(tag, version, latest, ctx).await?; + let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?; if blknum >= nblocks { debug!( "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", @@ -200,7 +200,7 @@ impl Timeline { spcnode: Oid, dbnode: Oid, version: Version<'_>, - latest: bool, + horizon: Lsn, ctx: &RequestContext, ) -> Result { let mut total_blocks = 0; @@ -208,7 +208,7 @@ impl Timeline { let rels = self.list_rels(spcnode, dbnode, version, ctx).await?; for rel in rels { - let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?; + let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?; total_blocks += n_blocks as usize; } Ok(total_blocks) @@ -219,7 +219,7 @@ impl Timeline { &self, tag: RelTag, version: Version<'_>, - latest: bool, + horizon: Lsn, ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { @@ -233,7 +233,7 @@ impl Timeline { } if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) - && !self.get_rel_exists(tag, version, latest, ctx).await? + && !self.get_rel_exists(tag, version, horizon, ctx).await? { // FIXME: Postgres sometimes calls smgrcreate() to create // FSM, and smgrnblocks() on it immediately afterwards, @@ -246,14 +246,8 @@ impl Timeline { let mut buf = version.get(self, key, ctx).await?; let nblocks = buf.get_u32_le(); - if latest { - // Update relation size cache only if "latest" flag is set. - // This flag is set by compute when it is working with most recent version of relation. - // Typically master compute node always set latest=true. - // Please notice, that even if compute node "by mistake" specifies old LSN but set - // latest=true, then it can not cause cache corruption, because with latest=true - // pageserver choose max(request_lsn, last_written_lsn) and so cached value will be - // associated with most recent value of LSN. + if horizon == Lsn::MAX { + // Update relation size cache only if latest version is requested. self.update_cached_rel_size(tag, version.get_lsn(), nblocks); } Ok(nblocks) @@ -264,7 +258,7 @@ impl Timeline { &self, tag: RelTag, version: Version<'_>, - _latest: bool, + _horizon: Lsn, ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { @@ -1066,7 +1060,7 @@ impl<'a> DatadirModification<'a> { ) -> anyhow::Result<()> { let total_blocks = self .tline - .get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx) + .get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx) .await?; // Remove entry from dbdir @@ -1157,7 +1151,7 @@ impl<'a> DatadirModification<'a> { anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); if self .tline - .get_rel_exists(rel, Version::Modified(self), true, ctx) + .get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx) .await? { let size_key = rel_size_to_key(rel); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2336836612..a817a70543 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -215,7 +215,7 @@ pub struct Timeline { // Atomic would be more appropriate here. last_freeze_ts: RwLock, - pub(crate) standby_flush_lsn: AtomicLsn, + pub(crate) standby_horizon: AtomicLsn, // WAL redo manager. `None` only for broken tenants. walredo_mgr: Option>, @@ -1542,7 +1542,7 @@ impl Timeline { compaction_lock: tokio::sync::Mutex::default(), gc_lock: tokio::sync::Mutex::default(), - standby_flush_lsn: AtomicLsn::new(0), + standby_horizon: AtomicLsn::new(0), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -4202,9 +4202,9 @@ impl Timeline { }; let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); - let standby_flush_lsn = self.standby_flush_lsn.load(); - let new_gc_cutoff = if standby_flush_lsn != Lsn::INVALID { - Lsn::min(standby_flush_lsn, new_gc_cutoff) + let standby_horizon = self.standby_horizon.load(); + let new_gc_cutoff = if standby_horizon != Lsn::INVALID { + Lsn::min(standby_horizon, new_gc_cutoff) } else { new_gc_cutoff }; diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index b39aadf4e7..7c31654902 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -554,8 +554,8 @@ impl ConnectionManagerState { WALRECEIVER_BROKER_UPDATES.inc(); self.timeline - .standby_flush_lsn - .store(Lsn(timeline_update.standby_flush_lsn)); + .standby_horizon + .store(Lsn(timeline_update.standby_horizon)); let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); let old_entry = self.wal_stream_candidates.insert( @@ -923,7 +923,7 @@ mod tests { remote_consistent_lsn: 0, peer_horizon_lsn: 0, local_start_lsn: 0, - standby_flush_lsn: 0, + standby_horizon: 0, safekeeper_connstr: safekeeper_connstr.to_owned(), http_connstr: safekeeper_connstr.to_owned(), availability_zone: None, diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 93d1dcab35..dbaf1486d0 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1019,7 +1019,7 @@ impl WalIngest { let nblocks = modification .tline - .get_rel_size(src_rel, Version::Modified(modification), true, ctx) + .get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx) .await?; let dst_rel = RelTag { spcnode: tablespace_id, @@ -1057,7 +1057,7 @@ impl WalIngest { src_rel, blknum, Version::Modified(modification), - true, + Lsn::MAX, ctx, ) .await?; @@ -1227,7 +1227,7 @@ impl WalIngest { }; if modification .tline - .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? { self.put_rel_drop(modification, rel, ctx).await?; @@ -1526,7 +1526,7 @@ impl WalIngest { nblocks } else if !modification .tline - .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? { // create it with 0 size initially, the logic below will extend it @@ -1538,7 +1538,7 @@ impl WalIngest { } else { modification .tline - .get_rel_size(rel, Version::Modified(modification), true, ctx) + .get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? }; @@ -1635,14 +1635,14 @@ async fn get_relsize( ) -> anyhow::Result { let nblocks = if !modification .tline - .get_rel_exists(rel, Version::Modified(modification), true, ctx) + .get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? { 0 } else { modification .tline - .get_rel_size(rel, Version::Modified(modification), true, ctx) + .get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx) .await? }; Ok(nblocks) diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 2889ffacae..c64750a1c6 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -79,7 +79,7 @@ typedef enum { typedef struct { NeonMessageTag tag; - bool latest; /* if true, request latest page version */ + XLogRecPtr horizon; /* uppe boundary for page LSN */ XLogRecPtr lsn; /* request page version @ this LSN */ } NeonRequest; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 63e8b8dc1f..751d2eb039 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -110,6 +110,14 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id); static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; +#define MAX_LSN ((XLogRecPtr)~0) + +static XLogRecPtr +neon_get_horizon(bool latest) +{ + return latest ? MAX_LSN : GetXLogReplayRecPtr(NULL); +} + /* * Prefetch implementation: * @@ -680,9 +688,10 @@ static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn) { bool found; + bool latest; NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, - .req.latest = false, + .req.horizon = 0, .req.lsn = 0, .rinfo = BufTagGetNRelFileInfo(slot->buftag), .forknum = slot->buftag.forkNum, @@ -692,13 +701,13 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force if (force_lsn && force_latest) { request.req.lsn = *force_lsn; - request.req.latest = *force_latest; + latest = *force_latest; slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn; } else { XLogRecPtr lsn = neon_get_request_lsn( - &request.req.latest, + &latest, BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum @@ -726,6 +735,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force prefetch_lsn = Max(prefetch_lsn, lsn); slot->effective_request_lsn = prefetch_lsn; } + request.req.horizon = neon_get_horizon(latest); Assert(slot->response == NULL); Assert(slot->my_ring_index == MyPState->ring_unused); @@ -996,7 +1006,7 @@ nm_pack_request(NeonRequest *msg) { NeonExistsRequest *msg_req = (NeonExistsRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); @@ -1009,7 +1019,7 @@ nm_pack_request(NeonRequest *msg) { NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); @@ -1022,7 +1032,7 @@ nm_pack_request(NeonRequest *msg) { NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, msg_req->dbNode); @@ -1032,7 +1042,7 @@ nm_pack_request(NeonRequest *msg) { NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg; - pq_sendbyte(&s, msg_req->req.latest); + pq_sendint64(&s, msg_req->req.horizon); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo)); pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo)); @@ -1199,7 +1209,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1212,7 +1222,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo)); appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1226,7 +1236,7 @@ nm_to_string(NeonMessage *msg) appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum); appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1237,7 +1247,7 @@ nm_to_string(NeonMessage *msg) appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\""); appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); - appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon)); appendStringInfoChar(&s, '}'); break; } @@ -1654,7 +1664,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) { NeonExistsRequest request = { .req.tag = T_NeonExistsRequest, - .req.latest = latest, + .req.horizon = neon_get_horizon(latest), .req.lsn = request_lsn, .rinfo = InfoFromSMgrRel(reln), .forknum = forkNum}; @@ -2463,7 +2473,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) { NeonNblocksRequest request = { .req.tag = T_NeonNblocksRequest, - .req.latest = latest, + .req.horizon = neon_get_horizon(latest), .req.lsn = request_lsn, .rinfo = InfoFromSMgrRel(reln), .forknum = forknum, @@ -2520,7 +2530,7 @@ neon_dbsize(Oid dbNode) { NeonDbSizeRequest request = { .req.tag = T_NeonDbSizeRequest, - .req.latest = latest, + .req.horizon = neon_get_horizon(latest), .req.lsn = request_lsn, .dbNode = dbNode, }; @@ -2969,7 +2979,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, NeonNblocksRequest request = { .req = (NeonRequest) { .lsn = end_recptr, - .latest = false, + .horizon = neon_get_horizon(false), .tag = T_NeonNblocksRequest, }, .rinfo = rinfo, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index b5e6727c07..e297229239 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -350,7 +350,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result, n_keys: u64) { http_connstr: "zenith-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, - standby_flush_lsn: 0, + standby_horizon: 0, }; counter += 1; yield info; diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index 74c94c31a8..b34501b62c 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -42,7 +42,7 @@ message SafekeeperTimelineInfo { uint64 remote_consistent_lsn = 7; uint64 peer_horizon_lsn = 8; uint64 local_start_lsn = 9; - uint64 standby_flush_lsn = 14; + uint64 standby_horizon = 14; // A connection string to use for WAL receiving. string safekeeper_connstr = 10; // HTTP endpoint connection string diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index b54d9461e8..16cbd1a4df 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -734,7 +734,7 @@ mod tests { http_connstr: "neon-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, - standby_flush_lsn: 0, + standby_horizon: 0, }) }