diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 3a6755d5d8..3aa0fc673f 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -6,7 +6,7 @@ use std::os::fd::OwnedFd; use crate::backend_comms::NeonIOHandle; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess}; -use crate::neon_request::CCachedGetPageVResult; +use crate::neon_request::{CCachedGetPageVResult, COid}; use crate::neon_request::{NeonIORequest, NeonIOResult}; pub struct CommunicatorBackendStruct<'t> { @@ -158,6 +158,29 @@ pub extern "C" fn bcomm_finish_cache_read(bs: &mut CommunicatorBackendStruct) -> } } + +/// Check if the local file cache contians the given block +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_cache_contains( + bs: &mut CommunicatorBackendStruct, + spc_oid: COid, + db_oid: COid, + rel_number: u32, + fork_number: u8, + block_number: u32, +) -> bool { + bs.integrated_cache.cache_contains_page( + &pageserver_page_api::RelTag { + spcnode: spc_oid, + dbnode: db_oid, + relnode: rel_number, + forknum: fork_number, + }, + block_number + ) +} + + impl<'t> CommunicatorBackendStruct<'t> { /// Send a wakeup to the communicator process fn submit_request(self: &CommunicatorBackendStruct<'t>, request_idx: i32) { diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index cdf3f9a761..4ec2682710 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -738,6 +738,12 @@ impl<'t> IntegratedCacheReadAccess<'t> { map_access: self, } } + + /// Check if the given page is present in the cache + pub fn cache_contains_page(&'t self, rel: &RelTag, block_number: u32) -> bool { + self.block_map + .get(&BlockKey::from((rel, block_number))).is_some() + } } pub struct BackendCacheReadOp<'t> { diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 12dc308f9c..4b5be0b34c 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -1,5 +1,5 @@ -type CLsn = u64; -type COid = u32; +pub type CLsn = u64; +pub type COid = u32; // This conveniently matches PG_IOV_MAX pub const MAX_GETPAGEV_PAGES: usize = 32; diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index fa2fc092ee..e569e63a9d 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -453,6 +453,25 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu elog(LOG, "sent prefetch request with idx %d", request_idx); } +/* + * Does the LFC contains the given buffer? + * + * This is used in WAL replay in read replica, to skip updating pages that are + * not in cache. + */ +bool +communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blockno) +{ + return bcomm_cache_contains(my_bs, + NInfoGetSpcOid(rinfo), + NInfoGetDbOid(rinfo), + NInfoGetRelNumber(rinfo), + forkNum, + blockno); +} + + static void process_inflight_requests(void) { diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 43dc1ad793..bbab3f8f5a 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -36,6 +36,8 @@ extern void communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, BlockNumber nblocks); +extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blockno); extern int communicator_new_read_slru_segment(SlruKind kind, int64 segno, void *buffer); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 2bbb98b6c1..cc1bf384cb 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -2570,7 +2570,10 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) * We should perform this check after assigning LwLSN to prevent * prefetching of some older version of the page by some other backend. */ - no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); + if (neon_enable_new_communicator) + no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno); + else + no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); } LWLockRelease(partitionLock);