diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index e00e49bf3d..10103cb1b5 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -332,6 +332,13 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } } + /// Returns the last written LSN. + pub fn get_lsn(&'t self) -> Lsn { + // TODO: supposedly, this should be the last written LSN, but it is not + // , perhaps + Lsn(self.global_lw_lsn.load(Ordering::Relaxed)) + } + pub fn get_db_size(&'t self, _db_oid: u32) -> CacheResult { // TODO: it would be nice to cache database sizes too. Getting the database size // is not a very common operation, but when you do it, it's often interactive, with diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 9f5d134194..89f66b619c 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -4,7 +4,7 @@ pub type COid = u32; // This conveniently matches PG_IOV_MAX pub const MAX_GETPAGEV_PAGES: usize = 32; -use pageserver_page_api as page_api; +use pageserver_page_api::{self as page_api, SlruKind}; #[allow(clippy::large_enum_variant)] #[repr(C)] @@ -17,6 +17,7 @@ pub enum NeonIORequest { RelExists(CRelExistsRequest), RelSize(CRelSizeRequest), GetPageV(CGetPageVRequest), + ReadSlruSegment(CReadSlruSegmentRequest), PrefetchV(CPrefetchVRequest), DbSize(CDbSizeRequest), @@ -42,6 +43,9 @@ pub enum NeonIOResult { /// the result pages are written to the shared memory addresses given in the request GetPageV, + /// The result is written to the shared memory address given in the + /// request. + ReadSlruSegment, /// A prefetch request returns as soon as the request has been received by the communicator. /// It is processed in the background. @@ -67,6 +71,7 @@ impl NeonIORequest { RelExists(req) => req.request_id, RelSize(req) => req.request_id, GetPageV(req) => req.request_id, + ReadSlruSegment(req) => req.request_id, PrefetchV(req) => req.request_id, DbSize(req) => req.request_id, WritePage(req) => req.request_id, @@ -174,6 +179,23 @@ pub struct CGetPageVRequest { pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES], } +#[repr(C)] +#[derive(Copy, Clone, Debug)] +pub struct CReadSlruSegmentRequest { + pub request_id: u64, + pub slru_kind: SlruKind, + pub segment_number: u32, + + // pub spc_oid: COid, + // pub db_oid: COid, + // pub rel_number: u32, + // pub fork_number: u8, + // pub block_number: u32, + + // These fields define where the result is written. Must point into a buffer in shared memory! + pub dest: ShmemBuf, +} + #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CPrefetchVRequest { diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index c207132753..4dce64b5f7 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -58,6 +58,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> { request_rel_exists_counter: IntCounter, request_rel_size_counter: IntCounter, request_get_pagev_counter: IntCounter, + request_read_slru_segment_counter: IntCounter, request_prefetchv_counter: IntCounter, request_db_size_counter: IntCounter, request_write_page_counter: IntCounter, @@ -123,6 +124,8 @@ pub(super) async fn init( let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]); let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]); let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]); + let request_read_slru_segment_counter = + request_counters.with_label_values(&["read_slru_segment"]); let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]); let request_db_size_counter = request_counters.with_label_values(&["db_size"]); let request_write_page_counter = request_counters.with_label_values(&["write_page"]); @@ -173,6 +176,7 @@ pub(super) async fn init( request_rel_exists_counter, request_rel_size_counter, request_get_pagev_counter, + request_read_slru_segment_counter, request_prefetchv_counter, request_db_size_counter, request_write_page_counter, @@ -418,6 +422,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Err(errno) => NeonIOResult::Error(errno), } } + NeonIORequest::ReadSlruSegment(req) => { + self.request_read_slru_segment_counter.inc(); + let lsn = self.cache.get_lsn(); + + match self + .client + .get_slru_segment(page_api::GetSlruSegmentRequest { + read_lsn: self.request_lsns(lsn), + kind: req.slru_kind, + segno: req.segment_number, + }) + .await + { + Ok(slru_bytes) => { + let src: &[u8] = &slru_bytes.as_ref(); + let dest = req.dest; + let len = std::cmp::min(src.len(), dest.bytes_total()); + + unsafe { + std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len); + }; + + NeonIOResult::ReadSlruSegment + } + Err(err) => { + info!("tonic error: {err:?}"); + NeonIOResult::Error(0) + } + } + } NeonIORequest::PrefetchV(req) => { self.request_prefetchv_counter.inc(); self.request_prefetchv_nblocks_counter diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index bdd5a75d62..137c263993 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -997,10 +997,36 @@ communicator_new_dbsize(Oid dbNode) } int -communicator_new_read_slru_segment(SlruKind kind, int64 segno, void *buffer) +communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer) { - /* TODO */ - elog(ERROR, "not implemented"); + NeonIOResult result = {}; + NeonIORequest request = { + .tag = NeonIORequest_ReadSlruSegment, + .read_slru_segment = { + .request_id = assign_request_id(), + .slru_kind = kind, + .segment_number = segno, + .dest.ptr = buffer, + } + }; + + elog(DEBUG5, "readslrusegment called for kind=%u, segno=%u", kind, segno); + + perform_request(&request, &result); + switch (result.tag) + { + case NeonIOResult_ReadSlruSegment: + return 0; + case NeonIOResult_Error: + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read slru segment, kind=%u, segno=%u: %s", + kind, segno, pg_strerror(result.error)))); + break; + default: + elog(ERROR, "unexpected result for read SLRU operation: %d", result.tag); + break; + } } /* Write requests */ diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index 1323c48e15..8114d75445 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -38,7 +38,7 @@ extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkN BlockNumber nblocks); extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno); -extern int communicator_new_read_slru_segment(SlruKind kind, int64 segno, +extern int communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer); /* Write requests, to keep the caches up-to-date */ diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 9340d49f5a..c70159d4b9 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -1276,7 +1276,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, tag.dbOid = reln->smgr_rlocator.locator.dbOid; tag.relNumber = reln->smgr_rlocator.locator.relNumber; tag.forkNum = forknum; - + while (nblocks > 0) { int iterblocks = Min(nblocks, PG_IOV_MAX); @@ -1664,7 +1664,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, { neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks); - + prefetch_result = communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, read_pages); @@ -2385,7 +2385,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request_lsns.effective_request_lsn = request_lsn; if (neon_enable_new_communicator) - n_blocks = communicator_new_read_slru_segment(kind, segno, buffer); + n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, buffer); else n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer);