From 4053092408f43ed78bb1e8cdba96c74433fd565a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 8 Jul 2025 17:22:24 +0300 Subject: [PATCH] Fix LSN tracking on "unlogged index builds" Fixes the test_gin_redo.py test failure, and probably some others --- .../neon/communicator/src/integrated_cache.rs | 15 +++- pgxn/neon/communicator/src/neon_request.rs | 34 +++++++++- .../src/worker_process/main_loop.rs | 24 ++++--- pgxn/neon/communicator_new.c | 42 +++++++++++- pgxn/neon/communicator_new.h | 7 +- pgxn/neon/pagestore_client.h | 1 + pgxn/neon/pagestore_smgr.c | 68 +++++++++++++++---- test_runner/regress/test_gin_redo.py | 1 + 8 files changed, 160 insertions(+), 32 deletions(-) diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 5f0ca5f510..a7009f0eb5 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -192,6 +192,10 @@ struct RelEntry { /// cached size of the relation /// u32::MAX means 'not known' (that's InvalidBlockNumber in Postgres) nblocks: AtomicU32, + + /// This is the last time the "metadata" of this relation changed, not + /// the contents of the blocks. That is, the size of the relation. + lw_lsn: AtomicLsn, } impl std::fmt::Debug for RelEntry { @@ -338,7 +342,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { CacheResult::NotFound(lsn) } - pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { + pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32, lsn: Lsn) { match self.relsize_cache.entry(RelKey::from(rel)) { Entry::Vacant(e) => { tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); @@ -346,12 +350,14 @@ impl<'t> IntegratedCacheWriteAccess<'t> { _ = e .insert(RelEntry { nblocks: AtomicU32::new(nblocks), + lw_lsn: AtomicLsn::new(lsn.0), }) .expect("out of memory"); } Entry::Occupied(e) => { tracing::info!("updating rel entry for {rel:?}, {nblocks} blocks"); e.get().nblocks.store(nblocks, Ordering::Relaxed); + e.get().lw_lsn.store(lsn); } }; } @@ -515,10 +521,15 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } /// Forget information about given relation in the cache. (For DROP TABLE and such) - pub fn forget_rel(&'t self, rel: &RelTag) { + pub fn forget_rel(&'t self, rel: &RelTag, _nblocks: Option, flush_lsn: Lsn) { tracing::info!("forgetting rel entry for {rel:?}"); self.relsize_cache.remove(&RelKey::from(rel)); + // update with flush LSN + let _ = self + .global_lw_lsn + .fetch_max(flush_lsn.0, Ordering::Relaxed); + // also forget all cached blocks for the relation // FIXME /* diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index f54dcd9222..32a02cd8c3 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -28,6 +28,9 @@ pub enum NeonIORequest { RelCreate(CRelCreateRequest), RelTruncate(CRelTruncateRequest), RelUnlink(CRelUnlinkRequest), + + // Other requests + ForgetCache(CForgetCacheRequest), } #[repr(C)] @@ -72,6 +75,7 @@ impl NeonIORequest { RelCreate(req) => req.request_id, RelTruncate(req) => req.request_id, RelUnlink(req) => req.request_id, + ForgetCache(req) => req.request_id, } } } @@ -187,7 +191,6 @@ pub struct CPrefetchVRequest { pub struct CDbSizeRequest { pub request_id: u64, pub db_oid: COid, - pub request_lsn: CLsn, } #[repr(C)] @@ -241,6 +244,7 @@ pub struct CRelCreateRequest { pub db_oid: COid, pub rel_number: u32, pub fork_number: u8, + pub lsn: CLsn, } #[repr(C)] @@ -252,6 +256,7 @@ pub struct CRelTruncateRequest { pub rel_number: u32, pub fork_number: u8, pub nblocks: u32, + pub lsn: CLsn, } #[repr(C)] @@ -262,8 +267,7 @@ pub struct CRelUnlinkRequest { pub db_oid: COid, pub rel_number: u32, pub fork_number: u8, - pub block_number: u32, - pub nblocks: u32, + pub lsn: CLsn, } impl CRelExistsRequest { @@ -375,3 +379,27 @@ impl CRelUnlinkRequest { } } } + + +#[repr(C)] +#[derive(Copy, Clone, Debug)] +pub struct CForgetCacheRequest { + pub request_id: u64, + pub spc_oid: COid, + pub db_oid: COid, + pub rel_number: u32, + pub fork_number: u8, + pub nblocks: u32, + pub lsn: CLsn, +} + +impl CForgetCacheRequest { + pub fn reltag(&self) -> page_api::RelTag { + page_api::RelTag { + spcnode: self.spc_oid, + dbnode: self.db_oid, + relnode: self.rel_number, + forknum: self.fork_number, + } + } +} diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index fe6acbf049..2eacd13609 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -359,8 +359,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { { Ok(nblocks) => { // update the cache - tracing::info!("updated relsize for {:?} in cache: {}", rel, nblocks); - self.cache.remember_rel_size(&rel, nblocks); + tracing::info!("updated relsize for {:?} in cache: {}, lsn {}", rel, nblocks, read_lsn); + self.cache.remember_rel_size(&rel, nblocks, not_modified_since); NeonIOResult::RelSize(nblocks) } @@ -457,7 +457,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) .await; self.cache - .remember_rel_size(&req.reltag(), req.block_number + 1); + .remember_rel_size(&req.reltag(), req.block_number + 1, Lsn(req.lsn)); NeonIOResult::WriteOK } NeonIORequest::RelZeroExtend(req) => { @@ -466,31 +466,37 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .inc_by(req.nblocks as u64); // TODO: need to grab an io-in-progress lock for this? I guess not - // TODO: I think we should put the empty pages to the cache, or at least - // update the last-written LSN. + // TODO: We could put the empty pages to the cache. Maybe have + // a marker on the block entries for all-zero pages, instead of + // actually storing the empty pages. self.cache - .remember_rel_size(&req.reltag(), req.block_number + req.nblocks); + .remember_rel_size(&req.reltag(), req.block_number + req.nblocks, Lsn(req.lsn)); NeonIOResult::WriteOK } NeonIORequest::RelCreate(req) => { self.request_rel_create_counter.inc(); // TODO: need to grab an io-in-progress lock for this? I guess not - self.cache.remember_rel_size(&req.reltag(), 0); + self.cache.remember_rel_size(&req.reltag(), 0, Lsn(req.lsn)); NeonIOResult::WriteOK } NeonIORequest::RelTruncate(req) => { self.request_rel_truncate_counter.inc(); // TODO: need to grab an io-in-progress lock for this? I guess not - self.cache.remember_rel_size(&req.reltag(), req.nblocks); + self.cache.remember_rel_size(&req.reltag(), req.nblocks, Lsn(req.lsn)); NeonIOResult::WriteOK } NeonIORequest::RelUnlink(req) => { self.request_rel_unlink_counter.inc(); // TODO: need to grab an io-in-progress lock for this? I guess not - self.cache.forget_rel(&req.reltag()); + self.cache.forget_rel(&req.reltag(), None, Lsn(req.lsn)); + NeonIOResult::WriteOK + } + NeonIORequest::ForgetCache(req) => { + // TODO: need to grab an io-in-progress lock for this? I guess not + self.cache.forget_rel(&req.reltag(), Some(req.nblocks), Lsn(req.lsn)); NeonIOResult::WriteOK } } diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 15643b822a..44070dd72d 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -1061,7 +1061,7 @@ communicator_new_rel_zeroextend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNum } void -communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum) +communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn) { NeonIORequest request = { .tag = NeonIORequest_RelCreate, @@ -1071,6 +1071,7 @@ communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum) .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), .fork_number = forkNum, + .lsn = lsn, } }; NeonIOResult result; @@ -1093,7 +1094,7 @@ communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum) } void -communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks) +communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn) { NeonIORequest request = { .tag = NeonIORequest_RelTruncate, @@ -1104,6 +1105,7 @@ communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe .rel_number = NInfoGetRelNumber(rinfo), .fork_number = forkNum, .nblocks = nblocks, + .lsn = lsn, } }; NeonIOResult result; @@ -1126,7 +1128,7 @@ communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe } void -communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum) +communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn) { NeonIORequest request = { .tag = NeonIORequest_RelUnlink, @@ -1136,6 +1138,7 @@ communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum) .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), .fork_number = forkNum, + .lsn = lsn, } }; NeonIOResult result; @@ -1157,6 +1160,39 @@ communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum) } } +void +communicator_new_forget_cache(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn) +{ + NeonIORequest request = { + .tag = NeonIORequest_ForgetCache, + .forget_cache = { + .request_id = assign_request_id(), + .spc_oid = NInfoGetSpcOid(rinfo), + .db_oid = NInfoGetDbOid(rinfo), + .rel_number = NInfoGetRelNumber(rinfo), + .fork_number = forkNum, + .nblocks = nblocks, + .lsn = lsn, + } + }; + NeonIOResult result; + + perform_request(&request, &result); + switch (result.tag) + { + case NeonIOResult_WriteOK: + return; + case NeonIOResult_Error: + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not forget cache for rel %u/%u/%u.%u: %s", + RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error)))); + break; + default: + elog(ERROR, "unexpected result for ForgetCache operation: %d", result.tag); + break; + } +} /* Debugging functions */ diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index bbab3f8f5a..5b636b687a 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -49,8 +49,9 @@ extern void communicator_new_rel_extend(NRelFileInfo rinfo, ForkNumber forkNum, extern void communicator_new_rel_zeroextend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, BlockNumber nblocks, XLogRecPtr lsn); -extern void communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum); -extern void communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks); -extern void communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum); +extern void communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn); +extern void communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn); +extern void communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum, XLogRecPtr lsn); +extern void communicator_new_forget_cache(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks, XLogRecPtr lsn); #endif /* COMMUNICATOR_NEW_H */ diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index c2727e232b..eb3c80702e 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -292,6 +292,7 @@ extern int64 neon_dbsize(Oid dbNode); extern void neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, neon_request_lsns *output, BlockNumber nblocks); +extern XLogRecPtr neon_get_write_lsn(void); /* utils for neon relsize cache */ extern void relsize_hash_init(void); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 4189af4d32..9ef393b8ff 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -502,6 +502,37 @@ nm_adjust_lsn(XLogRecPtr lsn) return lsn; } +/* + * Get a LSN to use to stamp an operation like relation create or truncate. + * On operations on individual pages we use the LSN of the page, but when + * e.g. smgrcreate() is called, we have to do something else. + */ +XLogRecPtr +neon_get_write_lsn(void) +{ + XLogRecPtr lsn; + + if (RecoveryInProgress()) + { + /* + * FIXME: v14 doesn't have GetCurrentReplayRecPtr(). Options: + * - add it in our fork + * - store a magic value that means that you must use + * current latest possible LSN at the time that the request + * on this thing is made again (or some other recent enough + * lsn). + */ +#if PG_VERSION_NUM >= 150000 + lsn = GetCurrentReplayRecPtr(NULL); +#else + lsn = GetXLogReplayRecPtr(NULL); /* FIXME: this is wrong, see above */ +#endif + } + else + lsn = GetXLogInsertRecPtr(); + + return lsn; +} /* * Return LSN for requesting pages and number of blocks from page server @@ -824,13 +855,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) if (neon_enable_new_communicator) { + XLogRecPtr lsn = neon_get_write_lsn(); + if (isRedo) { if (!communicator_new_rel_exists(InfoFromSMgrRel(reln), forkNum)) - communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum); + communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum, lsn); } else - communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum); + communicator_new_rel_create(InfoFromSMgrRel(reln), forkNum, lsn); } else { @@ -902,7 +935,9 @@ neon_unlink(NRelFileInfoBackend rinfo, ForkNumber forkNum, bool isRedo) { if (neon_enable_new_communicator) { - communicator_new_rel_unlink(InfoFromNInfoB(rinfo), forkNum); + XLogRecPtr lsn = neon_get_write_lsn(); + + communicator_new_rel_unlink(InfoFromNInfoB(rinfo), forkNum, lsn); } else forget_cached_relsize(InfoFromNInfoB(rinfo), forkNum); @@ -1962,7 +1997,9 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo if (neon_enable_new_communicator) { - communicator_new_rel_truncate(InfoFromSMgrRel(reln), forknum, nblocks); + XLogRecPtr lsn = neon_get_write_lsn(); + + communicator_new_rel_truncate(InfoFromSMgrRel(reln), forknum, nblocks, lsn); } else { @@ -2226,12 +2263,15 @@ neon_end_unlogged_build(SMgrRelation reln) nblocks = mdnblocks(reln, MAIN_FORKNUM); recptr = GetXLogInsertRecPtr(); - neon_set_lwlsn_block_range(recptr, - InfoFromNInfoB(rinfob), - MAIN_FORKNUM, 0, nblocks); - neon_set_lwlsn_relation(recptr, - InfoFromNInfoB(rinfob), - MAIN_FORKNUM); + if (!neon_enable_new_communicator) + { + neon_set_lwlsn_block_range(recptr, + InfoFromNInfoB(rinfob), + MAIN_FORKNUM, 0, nblocks); + neon_set_lwlsn_relation(recptr, + InfoFromNInfoB(rinfob), + MAIN_FORKNUM); + } /* Remove local copy */ for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++) @@ -2240,8 +2280,12 @@ neon_end_unlogged_build(SMgrRelation reln) RelFileInfoFmt(InfoFromNInfoB(rinfob)), forknum); - // FIXME: also do this with the new communicator - if (!neon_enable_new_communicator) + if (neon_enable_new_communicator) + { + /* TODO: we could update the cache with the size, since we have it at hand */ + communicator_new_forget_cache(InfoFromSMgrRel(reln), forknum, nblocks, recptr); + } + else { forget_cached_relsize(InfoFromNInfoB(rinfob), forknum); lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks); diff --git a/test_runner/regress/test_gin_redo.py b/test_runner/regress/test_gin_redo.py index 71382990dc..3ec2163203 100644 --- a/test_runner/regress/test_gin_redo.py +++ b/test_runner/regress/test_gin_redo.py @@ -16,6 +16,7 @@ def test_gin_redo(neon_simple_env: NeonEnv): secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") con = primary.connect() cur = con.cursor() + cur.execute("select pg_switch_wal()") cur.execute("create table gin_test_tbl(id integer, i int4[])") cur.execute("create index gin_test_idx on gin_test_tbl using gin (i)") cur.execute("insert into gin_test_tbl select g,array[3, 1, g] from generate_series(1, 10000) g")