From 4016808dff303083f2107bdf9cb8d05549f87ff9 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 31 Jul 2025 11:04:15 +0300 Subject: [PATCH] Handle get_raw_page_at_lsn() debugging function properly This adds a new request type between backend and communicator, to make a getpage request at a given LSN, bypassing the LFC. Only used by the get_raw_page_at_lsn() debugging/testing function. --- pgxn/neon/communicator/src/neon_request.rs | 34 +++++++ .../src/worker_process/main_loop.rs | 73 ++++++++++++++- pgxn/neon/communicator_new.c | 92 ++++++++++++++++--- pgxn/neon/communicator_new.h | 8 +- pgxn/neon/pagestore_smgr.c | 10 +- 5 files changed, 194 insertions(+), 23 deletions(-) diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index e528d451f3..809e645f8c 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -36,6 +36,10 @@ pub enum NeonIORequest { PrefetchV(CPrefetchVRequest), DbSize(CDbSizeRequest), + /// This is like GetPageV, but bypasses the LFC and allows specifiying the + /// request LSNs directly. For debugging purposes only. + GetPageVUncached(CGetPageVUncachedRequest), + // Write requests. These are needed to keep the relation size cache and LFC up-to-date. // They are not sent to the pageserver. WritePage(CWritePageRequest), @@ -89,6 +93,7 @@ impl NeonIORequest { Empty => 0, RelSize(req) => req.request_id, GetPageV(req) => req.request_id, + GetPageVUncached(req) => req.request_id, ReadSlruSegment(req) => req.request_id, PrefetchV(req) => req.request_id, DbSize(req) => req.request_id, @@ -191,6 +196,24 @@ pub struct CGetPageVRequest { pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES], } +#[repr(C)] +#[derive(Copy, Clone, Debug)] +pub struct CGetPageVUncachedRequest { + pub request_id: u64, + pub spc_oid: COid, + pub db_oid: COid, + pub rel_number: u32, + pub fork_number: u8, + pub block_number: u32, + pub nblocks: u8, + + pub request_lsn: CLsn, + pub not_modified_since: CLsn, + + // These fields define where the result is written. Must point into a buffer in shared memory! + pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES], +} + #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CReadSlruSegmentRequest { @@ -331,6 +354,17 @@ impl CGetPageVRequest { } } +impl CGetPageVUncachedRequest { + pub fn reltag(&self) -> page_api::RelTag { + page_api::RelTag { + spcnode: self.spc_oid, + dbnode: self.db_oid, + relnode: self.rel_number, + forknum: self.fork_number, + } + } +} + impl CPrefetchVRequest { pub fn reltag(&self) -> page_api::RelTag { page_api::RelTag { diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index df93187161..10a5fd81e5 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -9,7 +9,7 @@ use crate::file_cache::FileCache; use crate::global_allocator::MyAllocatorCollector; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; -use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; +use crate::neon_request::{CGetPageVRequest, CGetPageVUncachedRequest, CPrefetchVRequest}; use crate::neon_request::{INVALID_BLOCK_NUMBER, NeonIORequest, NeonIOResult}; use crate::worker_process::control_socket; use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable}; @@ -398,6 +398,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(()) => NeonIOResult::GetPageV, Err(errno) => NeonIOResult::Error(errno), }, + NeonIORequest::GetPageVUncached(req) => { + match self.handle_get_pagev_uncached_request(req).await { + Ok(()) => NeonIOResult::GetPageV, + Err(errno) => NeonIOResult::Error(errno), + } + } NeonIORequest::ReadSlruSegment(req) => { let lsn = Lsn(req.request_lsn); let file_path = req.destination_file_path(); @@ -659,6 +665,71 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(()) } + /// Subroutine to handle an GetPageVUncached request. + /// + /// Note: this bypasses the cache, in-progress IO locking, and all other side-effects. + /// This request type is only used in tests. + async fn handle_get_pagev_uncached_request( + &'t self, + req: &CGetPageVUncachedRequest, + ) -> Result<(), i32> { + let rel = req.reltag(); + + // Construct a pageserver request + let block_numbers: Vec = + (req.block_number..(req.block_number + (req.nblocks as u32))).collect(); + let read_lsn = page_api::ReadLsn { + request_lsn: Lsn(req.request_lsn), + not_modified_since_lsn: Some(Lsn(req.not_modified_since)), + }; + trace!( + "sending (uncached) getpage request for blocks {:?} in rel {:?} lsns {}", + block_numbers, rel, read_lsn + ); + match self + .client + .get_page(page_api::GetPageRequest { + request_id: req.request_id.into(), + request_class: page_api::GetPageClass::Normal, + read_lsn, + rel, + block_numbers: block_numbers.clone(), + }) + .await + { + Ok(resp) => { + // Write the received page images directly to the shared memory location + // that the backend requested. + if resp.pages.len() != block_numbers.len() { + error!( + "received unexpected response with {} page images from pageserver for a request for {} pages", + resp.pages.len(), + block_numbers.len(), + ); + return Err(-1); + } + + trace!( + "received getpage response for blocks {:?} in rel {:?} lsns {}", + block_numbers, rel, read_lsn + ); + + for (page, dest) in resp.pages.into_iter().zip(req.dest) { + let src: &[u8] = page.image.as_ref(); + let len = std::cmp::min(src.len(), dest.bytes_total()); + unsafe { + std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len); + }; + } + } + Err(err) => { + info!("tonic error: {err:?}"); + return Err(-1); + } + } + Ok(()) + } + /// Subroutine to handle a PrefetchV request, since it's a little more complicated than /// others. /// diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 7e619e62c2..1b448c969c 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -635,8 +635,8 @@ communicator_new_rel_exists(NRelFileInfo rinfo, ForkNumber forkNum) * Read N consecutive pages from a relation */ void -communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, - void **buffers, BlockNumber nblocks) +communicator_new_readv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, + void **buffers, BlockNumber nblocks) { NeonIOResult result; CCachedGetPageVResult cached_result; @@ -698,8 +698,8 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe /* Split the vector-request into single page requests */ for (int j = 0; j < nblocks; j++) { - communicator_new_read_at_lsnv(rinfo, forkNum, blockno + j, - &buffers[j], 1); + communicator_new_readv(rinfo, forkNum, blockno + j, + &buffers[j], 1); } return; } @@ -790,14 +790,68 @@ retry: if (bounce_buf_used) memcpy(buffers[0], bounce_buf_used, BLCKSZ); return; + case NeonIOResult_Error: + if (nblocks > 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read block %u in rel %u/%u/%u.%u: %s", + blockno, RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read %u blocks at %u in rel %u/%u/%u.%u: %s", + nblocks, blockno, RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error)))); + break; + default: + elog(ERROR, "unexpected result for GetPageV operation: %d", result.tag); + break; + } +} + +/* + * Read a page at given LSN, bypassing the LFC. + * + * For tests and debugging purposes only. + */ +void +communicator_new_read_at_lsn_uncached(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, + void *buffer, XLogRecPtr request_lsn, XLogRecPtr not_modified_since) +{ + NeonIOResult result; + void *bounce_buf_used; + NeonIORequest request = { + .tag = NeonIORequest_GetPageVUncached, + .get_page_v_uncached = { + .request_id = assign_request_id(), + .spc_oid = NInfoGetSpcOid(rinfo), + .db_oid = NInfoGetDbOid(rinfo), + .rel_number = NInfoGetRelNumber(rinfo), + .fork_number = forkNum, + .block_number = blockno, + .nblocks = 1, + .request_lsn = request_lsn, + .not_modified_since = not_modified_since, + } + }; + + /* This is for tests only and doesn't need to be particularly fast. Always use the bounce buffer for simplicity */ + request.get_page_v_uncached.dest[0].ptr = bounce_buf_used = bounce_buf(); + + /* don't use the specialized bcomm_start_get_page_v_request() function here, because we want to bypass the LFC */ + perform_request(&request, &result); + switch (result.tag) + { + case NeonIOResult_GetPageV: + memcpy(buffer, bounce_buf_used, BLCKSZ); + return; case NeonIOResult_Error: ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read block %u in rel %u/%u/%u.%u: %s", + errmsg("could not read (uncached) block %u in rel %u/%u/%u.%u: %s", blockno, RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error)))); break; default: - elog(ERROR, "unexpected result for GetPage operation: %d", result.tag); + elog(ERROR, "unexpected result for GetPageV operation: %d", result.tag); break; } } @@ -1215,8 +1269,18 @@ print_neon_io_request(NeonIORequest *request) CGetPageVRequest *r = &request->get_page_v; snprintf(buf, sizeof(buf), "GetPageV: req " UINT64_FORMAT " rel %u/%u/%u.%u blks %d-%d", - r->request_id, - r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks); + r->request_id, + r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks); + return buf; + } + case NeonIORequest_GetPageVUncached: + { + CGetPageVUncachedRequest *r = &request->get_page_v_uncached; + + snprintf(buf, sizeof(buf), "GetPageVUncached: req " UINT64_FORMAT " rel %u/%u/%u.%u blk %d request_lsn %X/%X not_modified_since %X/%X", + r->request_id, + r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, + LSN_FORMAT_ARGS(r->request_lsn), LSN_FORMAT_ARGS(r->not_modified_since)); return buf; } case NeonIORequest_ReadSlruSegment: @@ -1236,8 +1300,8 @@ print_neon_io_request(NeonIORequest *request) CPrefetchVRequest *r = &request->prefetch_v; snprintf(buf, sizeof(buf), "PrefetchV: req " UINT64_FORMAT " rel %u/%u/%u.%u blks %d-%d", - r->request_id, - r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks); + r->request_id, + r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks); return buf; } case NeonIORequest_DbSize: @@ -1245,7 +1309,7 @@ print_neon_io_request(NeonIORequest *request) CDbSizeRequest *r = &request->db_size; snprintf(buf, sizeof(buf), "PrefetchV: req " UINT64_FORMAT " db %u", - r->request_id, r->db_oid); + r->request_id, r->db_oid); return buf; } case NeonIORequest_WritePage: @@ -1253,9 +1317,9 @@ print_neon_io_request(NeonIORequest *request) CWritePageRequest *r = &request->write_page; snprintf(buf, sizeof(buf), "WritePage: req " UINT64_FORMAT " rel %u/%u/%u.%u blk %u lsn %X/%X", - r->request_id, - r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, - LSN_FORMAT_ARGS(r->lsn)); + r->request_id, + r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, + LSN_FORMAT_ARGS(r->lsn)); return buf; } case NeonIORequest_RelExtend: diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 442e0b03a4..62d457e6ab 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -30,9 +30,11 @@ extern void communicator_new_init(void); extern bool communicator_new_rel_exists(NRelFileInfo rinfo, ForkNumber forkNum); extern BlockNumber communicator_new_rel_nblocks(NRelFileInfo rinfo, ForkNumber forknum); extern int64 communicator_new_dbsize(Oid dbNode); -extern void communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber base_blockno, - void **buffers, BlockNumber nblocks); +extern void communicator_new_readv(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber base_blockno, + void **buffers, BlockNumber nblocks); +extern void communicator_new_read_at_lsn_uncached(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, + void *buffer, XLogRecPtr request_lsn, XLogRecPtr not_modified_since); extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, BlockNumber nblocks); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 2038979f38..20994934fc 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1438,7 +1438,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { // FIXME: request_lsns is ignored. That affects the neon_test_utils callers. // Add the capability to specify the LSNs explicitly, for the sake of neon_test_utils ? - communicator_new_read_at_lsnv(rinfo, forkNum, blkno, &buffer, 1); + communicator_new_read_at_lsn_uncached(rinfo, forkNum, blkno, buffer, request_lsns.request_lsn, request_lsns.not_modified_since); } else communicator_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL); @@ -1569,8 +1569,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer if (neon_use_communicator_worker) { - communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forkNum, blkno, - (void *) &buffer, 1); + communicator_new_readv(InfoFromSMgrRel(reln), forkNum, blkno, + (void *) &buffer, 1); } else { @@ -1685,8 +1685,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, if (neon_use_communicator_worker) { - communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, - buffers, nblocks); + communicator_new_readv(InfoFromSMgrRel(reln), forknum, blocknum, + buffers, nblocks); } else {