Fix caching of newly extended pages

This fixes read errors e.g. in test_compute_catalog.py test (and
probably many others).
This commit is contained in:
Heikki Linnakangas
2025-07-02 23:18:33 +03:00
parent 7263d6e2e5
commit d8296e60e6
3 changed files with 35 additions and 8 deletions

View File

@@ -190,8 +190,7 @@ pub struct CRelExtendRequest {
pub lsn: CLsn,
// These fields define page contents. Must point into a buffer in shared memory!
pub src_ptr: usize,
pub src_size: u32,
pub src: ShmemBuf,
}
#[repr(C)]

View File

@@ -231,8 +231,28 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
fn request_lsns(&self, not_modified_since_lsn: Lsn) -> page_api::ReadLsn {
let mut request_lsn = get_request_lsn();
// Is it possible that the last-written LSN is ahead of last flush
// LSN? Generally not, we shouldn't evict a page from the buffer cache
// before all its modifications have been safely flushed. That's the
// "WAL before data" rule. However, such case does exist at index
// building, _bt_blwritepage logs the full page without flushing WAL
// before smgrextend (files are fsynced before build ends).
//
// FIXME: I'm seeing some other cases of this too in the regression tests.
// Maybe it's OK? Would be nice to dig a little deeper.
// See the old logic in neon_get_request_lsns() C function
if not_modified_since_lsn > request_lsn {
tracing::info!(
"not_modified_since_lsn {} is ahead of last flushed LSN {}",
not_modified_since_lsn, request_lsn
);
request_lsn = not_modified_since_lsn;
}
page_api::ReadLsn {
request_lsn: get_request_lsn(),
request_lsn,
not_modified_since_lsn: Some(not_modified_since_lsn),
}
}
@@ -371,7 +391,13 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIORequest::RelExtend(req) => {
self.request_rel_extend_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, req.block_number));
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
.await;
self.cache
.remember_rel_size(&req.reltag(), req.block_number + 1);
NeonIOResult::WriteOK
@@ -382,6 +408,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
.inc_by(req.nblocks as u64);
// TODO: need to grab an io-in-progress lock for this? I guess not
// TODO: I think we should put the empty pages to the cache, or at least
// update the last-written LSN.
self.cache
.remember_rel_size(&req.reltag(), req.block_number + req.nblocks);
NeonIOResult::WriteOK
@@ -461,12 +489,13 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// TODO: Use batched protocol
for (blkno, _lsn, dest, _guard) in cache_misses.iter() {
let read_lsn = self.request_lsns(not_modified_since);
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_class: page_api::GetPageClass::Normal,
read_lsn: self.request_lsns(not_modified_since),
read_lsn: read_lsn,
rel,
block_numbers: vec![*blkno],
})
@@ -491,7 +520,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// Also store it in the LFC while we have it
self.cache
.remember_page(&rel, *blkno, page_image, not_modified_since, false)
.remember_page(&rel, *blkno, page_image, read_lsn.not_modified_since_lsn.unwrap(), false)
.await;
}
Err(err) => {

View File

@@ -866,8 +866,7 @@ communicator_new_rel_extend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber
.fork_number = forkNum,
.block_number = blockno,
.lsn = lsn,
.src_ptr = (uintptr_t) src,
.src_size = BLCKSZ,
.src.ptr = src,
}
};
NeonIOResult result;