diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 4b5be0b34c..95fab449f6 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -190,8 +190,7 @@ pub struct CRelExtendRequest { pub lsn: CLsn, // These fields define page contents. Must point into a buffer in shared memory! - pub src_ptr: usize, - pub src_size: u32, + pub src: ShmemBuf, } #[repr(C)] diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 24be5f4987..42483e67ea 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -231,8 +231,28 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } fn request_lsns(&self, not_modified_since_lsn: Lsn) -> page_api::ReadLsn { + let mut request_lsn = get_request_lsn(); + + // Is it possible that the last-written LSN is ahead of last flush + // LSN? Generally not, we shouldn't evict a page from the buffer cache + // before all its modifications have been safely flushed. That's the + // "WAL before data" rule. However, such case does exist at index + // building, _bt_blwritepage logs the full page without flushing WAL + // before smgrextend (files are fsynced before build ends). + // + // FIXME: I'm seeing some other cases of this too in the regression tests. + // Maybe it's OK? Would be nice to dig a little deeper. + // See the old logic in neon_get_request_lsns() C function + if not_modified_since_lsn > request_lsn { + tracing::info!( + "not_modified_since_lsn {} is ahead of last flushed LSN {}", + not_modified_since_lsn, request_lsn + ); + request_lsn = not_modified_since_lsn; + } + page_api::ReadLsn { - request_lsn: get_request_lsn(), + request_lsn, not_modified_since_lsn: Some(not_modified_since_lsn), } } @@ -371,7 +391,13 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIORequest::RelExtend(req) => { self.request_rel_extend_counter.inc(); - // TODO: need to grab an io-in-progress lock for this? I guess not + let rel = req.reltag(); + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Block(rel, req.block_number)); + self.cache + .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) + .await; self.cache .remember_rel_size(&req.reltag(), req.block_number + 1); NeonIOResult::WriteOK @@ -382,6 +408,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .inc_by(req.nblocks as u64); // TODO: need to grab an io-in-progress lock for this? I guess not + // TODO: I think we should put the empty pages to the cache, or at least + // update the last-written LSN. self.cache .remember_rel_size(&req.reltag(), req.block_number + req.nblocks); NeonIOResult::WriteOK @@ -461,12 +489,13 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // TODO: Use batched protocol for (blkno, _lsn, dest, _guard) in cache_misses.iter() { + let read_lsn = self.request_lsns(not_modified_since); match self .client .get_page(page_api::GetPageRequest { request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), request_class: page_api::GetPageClass::Normal, - read_lsn: self.request_lsns(not_modified_since), + read_lsn: read_lsn, rel, block_numbers: vec![*blkno], }) @@ -491,7 +520,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // Also store it in the LFC while we have it self.cache - .remember_page(&rel, *blkno, page_image, not_modified_since, false) + .remember_page(&rel, *blkno, page_image, read_lsn.not_modified_since_lsn.unwrap(), false) .await; } Err(err) => { diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index fa9dac7705..5f8d8788e2 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -866,8 +866,7 @@ communicator_new_rel_extend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber .fork_number = forkNum, .block_number = blockno, .lsn = lsn, - .src_ptr = (uintptr_t) src, - .src_size = BLCKSZ, + .src.ptr = src, } }; NeonIOResult result;