From efdb07e7b621f163a5c048cf1d602495ac60bd68 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 1 Jul 2025 16:22:51 +0300 Subject: [PATCH] Implement function to check if page is in local cache This is needed for read replicas. There's one more TODO that needs to implemented before read replicas work though, in neon_extend_rel_size() --- .../communicator/src/backend_interface.rs | 25 ++++++++++++++++++- .../neon/communicator/src/integrated_cache.rs | 6 +++++ pgxn/neon/communicator/src/neon_request.rs | 4 +-- pgxn/neon/communicator_new.c | 19 ++++++++++++++ pgxn/neon/communicator_new.h | 2 ++ pgxn/neon/pagestore_smgr.c | 5 +++- 6 files changed, 57 insertions(+), 4 deletions(-) diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 3a6755d5d8..3aa0fc673f 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -6,7 +6,7 @@ use std::os::fd::OwnedFd; use crate::backend_comms::NeonIOHandle; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess}; -use crate::neon_request::CCachedGetPageVResult; +use crate::neon_request::{CCachedGetPageVResult, COid}; use crate::neon_request::{NeonIORequest, NeonIOResult}; pub struct CommunicatorBackendStruct<'t> { @@ -158,6 +158,29 @@ pub extern "C" fn bcomm_finish_cache_read(bs: &mut CommunicatorBackendStruct) -> } } + +/// Check if the local file cache contians the given block +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_cache_contains( + bs: &mut CommunicatorBackendStruct, + spc_oid: COid, + db_oid: COid, + rel_number: u32, + fork_number: u8, + block_number: u32, +) -> bool { + bs.integrated_cache.cache_contains_page( + &pageserver_page_api::RelTag { + spcnode: spc_oid, + dbnode: db_oid, + relnode: rel_number, + forknum: fork_number, + }, + block_number + ) +} + + impl<'t> CommunicatorBackendStruct<'t> { /// Send a wakeup to the communicator process fn submit_request(self: &CommunicatorBackendStruct<'t>, request_idx: i32) { diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index cdf3f9a761..4ec2682710 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -738,6 +738,12 @@ impl<'t> IntegratedCacheReadAccess<'t> { map_access: self, } } + + /// Check if the given page is present in the cache + pub fn cache_contains_page(&'t self, rel: &RelTag, block_number: u32) -> bool { + self.block_map + .get(&BlockKey::from((rel, block_number))).is_some() + } } pub struct BackendCacheReadOp<'t> { diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 12dc308f9c..4b5be0b34c 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -1,5 +1,5 @@ -type CLsn = u64; -type COid = u32; +pub type CLsn = u64; +pub type COid = u32; // This conveniently matches PG_IOV_MAX pub const MAX_GETPAGEV_PAGES: usize = 32; diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index fa2fc092ee..e569e63a9d 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -453,6 +453,25 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu elog(LOG, "sent prefetch request with idx %d", request_idx); } +/* + * Does the LFC contains the given buffer? + * + * This is used in WAL replay in read replica, to skip updating pages that are + * not in cache. + */ +bool +communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blockno) +{ + return bcomm_cache_contains(my_bs, + NInfoGetSpcOid(rinfo), + NInfoGetDbOid(rinfo), + NInfoGetRelNumber(rinfo), + forkNum, + blockno); +} + + static void process_inflight_requests(void) { diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 43dc1ad793..bbab3f8f5a 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -36,6 +36,8 @@ extern void communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, BlockNumber nblocks); +extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blockno); extern int communicator_new_read_slru_segment(SlruKind kind, int64 segno, void *buffer); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 2bbb98b6c1..cc1bf384cb 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -2570,7 +2570,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) * We should perform this check after assigning LwLSN to prevent * prefetching of some older version of the page by some other backend. */ - no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); + if (neon_enable_new_communicator) + no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno); + else + no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); } LWLockRelease(partitionLock);