From af5e3da381041467a8373453e49f3c05f8386707 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 30 Jul 2025 21:20:10 +0300 Subject: [PATCH] Fix updating last-written LSN when WAL redo skips updating a block This makes the test_replica_query_race test pass, and probably some other read replica tests too. --- libs/utils/src/lsn.rs | 5 + .../communicator/src/backend_interface.rs | 15 +- pgxn/neon/communicator/src/init.rs | 2 - .../neon/communicator/src/integrated_cache.rs | 367 +++++++++++++----- .../src/worker_process/main_loop.rs | 2 +- pgxn/neon/communicator_new.c | 22 +- pgxn/neon/communicator_new.h | 4 +- pgxn/neon/pagestore_smgr.c | 17 +- 8 files changed, 316 insertions(+), 118 deletions(-) diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 1abb63817b..47b7e6a888 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -310,6 +310,11 @@ impl AtomicLsn { } } + /// Consumes the atomic and returns the contained value. + pub const fn into_inner(self) -> Lsn { + Lsn(self.inner.into_inner()) + } + /// Atomically retrieve the `Lsn` value from memory. pub fn load(&self) -> Lsn { Lsn(self.inner.load(Ordering::Acquire)) diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index e69a3749a6..c3ae4a2436 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -6,9 +6,11 @@ use std::os::fd::OwnedFd; use crate::backend_comms::NeonIORequestSlot; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess}; -use crate::neon_request::{CCachedGetPageVResult, COid}; +use crate::neon_request::{CCachedGetPageVResult, CLsn, COid}; use crate::neon_request::{NeonIORequest, NeonIOResult}; +use utils::lsn::Lsn; + pub struct CommunicatorBackendStruct<'t> { my_proc_number: i32, @@ -174,17 +176,21 @@ pub extern "C" fn bcomm_finish_cache_read(bs: &mut CommunicatorBackendStruct) -> } } -/// Check if the local file cache contians the given block +/// Check if LFC contains the given buffer, and update its last-written LSN if not. +/// +/// This is used in WAL replay in read replica, to skip updating pages that are +/// not in cache. #[unsafe(no_mangle)] -pub extern "C" fn bcomm_cache_contains( +pub extern "C" fn bcomm_update_lw_lsn_for_block_if_not_cached( bs: &mut CommunicatorBackendStruct, spc_oid: COid, db_oid: COid, rel_number: u32, fork_number: u8, block_number: u32, + lsn: CLsn, ) -> bool { - bs.integrated_cache.cache_contains_page( + bs.integrated_cache.update_lw_lsn_for_block_if_not_cached( &pageserver_page_api::RelTag { spcnode: spc_oid, dbnode: db_oid, @@ -192,6 +198,7 @@ pub extern "C" fn bcomm_cache_contains( forknum: fork_number, }, block_number, + Lsn(lsn), ) } diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index f5af93cc97..7aebe4afab 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -122,8 +122,6 @@ pub extern "C" fn rcommunicator_shmem_init( cis } -// fixme: currently unused -#[allow(dead_code)] pub fn alloc_from_slice( area: &mut [MaybeUninit], ) -> (&mut MaybeUninit, &mut [MaybeUninit]) { diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 92a96e3456..7a7b2503bf 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -23,12 +23,13 @@ // use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use utils::lsn::{AtomicLsn, Lsn}; use crate::file_cache::INVALID_CACHE_BLOCK; use crate::file_cache::{CacheBlock, FileCache}; +use crate::init::alloc_from_slice; use pageserver_page_api::RelTag; use metrics::{IntCounter, IntGauge}; @@ -41,25 +42,33 @@ const RELSIZE_CACHE_SIZE: u32 = 64 * 1024; /// This struct is initialized at postmaster startup, and passed to all the processes via fork(). pub struct IntegratedCacheInitStruct<'t> { + shared: &'t IntegratedCacheShared, relsize_cache_handle: HashMapInit<'t, RelKey, RelEntry>, block_map_handle: HashMapInit<'t, BlockKey, BlockEntry>, } +/// This struct is allocated in the (fixed-size) shared memory area at postmaster startup. +/// It is accessible by all the backends and the communicator process. +#[derive(Debug)] +pub struct IntegratedCacheShared { + global_lw_lsn: AtomicU64, +} + /// Represents write-access to the integrated cache. This is used by the communicator process. #[derive(Debug)] pub struct IntegratedCacheWriteAccess<'t> { + shared: &'t IntegratedCacheShared, relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, - global_lw_lsn: AtomicU64, - pub(crate) file_cache: Option, // Fields for eviction - clock_hand: std::sync::Mutex, + clock_hand: AtomicUsize, // Metrics - page_evictions_counter: IntCounter, + cache_page_evictions_counter: IntCounter, + block_entry_evictions_counter: IntCounter, clock_iterations_counter: IntCounter, // metrics from the hash map @@ -72,6 +81,7 @@ pub struct IntegratedCacheWriteAccess<'t> { /// Represents read-only access to the integrated cache. Backend processes have this. pub struct IntegratedCacheReadAccess<'t> { + shared: &'t IntegratedCacheShared, relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>, block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>, } @@ -82,7 +92,11 @@ impl<'t> IntegratedCacheInitStruct<'t> { pub fn shmem_size() -> usize { // The relsize cache is fixed-size. The block map is allocated in a separate resizable // area. - HashMapInit::::estimate_size(RELSIZE_CACHE_SIZE) + let mut sz = 0; + sz += std::mem::size_of::(); + sz += HashMapInit::::estimate_size(RELSIZE_CACHE_SIZE); + + sz } /// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which @@ -92,9 +106,15 @@ impl<'t> IntegratedCacheInitStruct<'t> { initial_file_cache_size: u64, max_file_cache_size: u64, ) -> IntegratedCacheInitStruct<'t> { - // Initialize the relsize cache in the fixed-size area + // Initialize the shared struct + let (shared, remain_shmem_area) = alloc_from_slice::(shmem_area); + let shared = shared.write(IntegratedCacheShared { + global_lw_lsn: AtomicU64::new(0), + }); + + // Use the remaining part of the fixed-size area for the relsize cache let relsize_cache_handle = - neon_shmem::hash::HashMapInit::with_fixed(RELSIZE_CACHE_SIZE, shmem_area); + neon_shmem::hash::HashMapInit::with_fixed(RELSIZE_CACHE_SIZE, remain_shmem_area); let max_bytes = HashMapInit::::estimate_size(max_file_cache_size as u32); @@ -105,6 +125,7 @@ impl<'t> IntegratedCacheInitStruct<'t> { let block_map_handle = neon_shmem::hash::HashMapInit::with_shmem(initial_file_cache_size as u32, shmem_handle); IntegratedCacheInitStruct { + shared, relsize_cache_handle, block_map_handle, } @@ -117,22 +138,32 @@ impl<'t> IntegratedCacheInitStruct<'t> { file_cache: Option, ) -> IntegratedCacheWriteAccess<'t> { let IntegratedCacheInitStruct { + shared, relsize_cache_handle, block_map_handle, } = self; + + shared.global_lw_lsn.store(lsn.0, Ordering::Relaxed); + IntegratedCacheWriteAccess { + shared, relsize_cache: relsize_cache_handle.attach_writer(), block_map: block_map_handle.attach_writer(), - global_lw_lsn: AtomicU64::new(lsn.0), file_cache, - clock_hand: std::sync::Mutex::new(0), + clock_hand: AtomicUsize::new(0), - page_evictions_counter: metrics::IntCounter::new( - "integrated_cache_evictions", + cache_page_evictions_counter: metrics::IntCounter::new( + "integrated_cache_page_evictions", "Page evictions from the Local File Cache", ) .unwrap(), + block_entry_evictions_counter: metrics::IntCounter::new( + "integrated_cache_block_entry_evictions", + "Block entry evictions from the integrated cache", + ) + .unwrap(), + clock_iterations_counter: metrics::IntCounter::new( "clock_iterations", "Number of times the clock hand has moved", @@ -166,11 +197,13 @@ impl<'t> IntegratedCacheInitStruct<'t> { /// Initialize access to the integrated cache for a backend process pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> { let IntegratedCacheInitStruct { + shared, relsize_cache_handle, block_map_handle, } = self; IntegratedCacheReadAccess { + shared, relsize_cache: relsize_cache_handle.attach_reader(), block_map: block_map_handle.attach_reader(), } @@ -253,12 +286,25 @@ pub enum CacheResult { NotFound(Lsn), } +/// Return type of [try_evict_entry] +enum EvictResult { + /// Could not evict page because it was pinned + Pinned, + + /// The victim bucket was already vacant + Vacant, + + /// Evicted an entry. If it had a cache block associated with it, it's returned + /// here, otherwise None + Evicted(Option), +} + impl<'t> IntegratedCacheWriteAccess<'t> { pub fn get_rel_size(&'t self, rel: &RelTag) -> CacheResult { if let Some(nblocks) = get_rel_size(&self.relsize_cache, rel) { CacheResult::Found(nblocks) } else { - let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed)); CacheResult::NotFound(lsn) } } @@ -283,7 +329,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { return Ok(CacheResult::NotFound(block_entry.lw_lsn.load())); } } else { - let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed)); return Ok(CacheResult::NotFound(lsn)); }; @@ -316,7 +362,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { Ok(CacheResult::NotFound(block_entry.lw_lsn.load())) } } else { - let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed)); Ok(CacheResult::NotFound(lsn)) } } @@ -328,7 +374,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if let Some(_rel_entry) = self.relsize_cache.get(&RelKey::from(rel)) { CacheResult::Found(true) } else { - let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed)); CacheResult::NotFound(lsn) } } @@ -339,7 +385,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // e.g. psql \l+ command, so the user will feel the latency. // fixme: is this right lsn? - let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed)); + let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed)); CacheResult::NotFound(lsn) } @@ -416,7 +462,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if let Some(x) = file_cache.alloc_block() { break x; } - if let Some(x) = self.try_evict_one_cache_block() { + if let Some(x) = self.try_evict_cache_block() { break x; } } @@ -431,7 +477,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // FIXME: unpin the block entry on error // Update the block entry - let entry = self.block_map.entry(key); + let entry = self.block_map.entry(key.clone()); assert_eq!(found_existing, matches!(entry, Entry::Occupied(_))); match entry { Entry::Occupied(e) => { @@ -478,7 +524,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { if let Some(x) = file_cache.alloc_block() { break x; } - if let Some(x) = self.try_evict_one_cache_block() { + if let Some(x) = self.try_evict_cache_block() { break x; } } @@ -491,32 +537,37 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .expect("error writing to cache"); // FIXME: handle errors gracefully. - match self.block_map.entry(key) { - Entry::Occupied(e) => { - let block_entry = e.get(); - // FIXME: could there be concurrent readers? - assert!(block_entry.pinned.load(Ordering::Relaxed) == 0); + loop { + match self.block_map.entry(key.clone()) { + Entry::Occupied(e) => { + let block_entry = e.get(); + // FIXME: could there be concurrent readers? + assert!(block_entry.pinned.load(Ordering::Relaxed) == 0); - let old_cache_block = - block_entry.cache_block.swap(cache_block, Ordering::Relaxed); - if old_cache_block != INVALID_CACHE_BLOCK { - panic!( - "remember_page called in !is_write mode, but page is already cached at blk {old_cache_block}" - ); + let old_cache_block = + block_entry.cache_block.swap(cache_block, Ordering::Relaxed); + if old_cache_block != INVALID_CACHE_BLOCK { + panic!( + "remember_page called in !is_write mode, but page is already cached at blk {old_cache_block}" + ); + } + break; } - } - Entry::Vacant(e) => { - // FIXME: what to do if we run out of memory? Evict other relation entries? Remove - // block entries first? - _ = e - .insert(BlockEntry { + Entry::Vacant(e) => { + if let Ok(_) = e.insert(BlockEntry { lw_lsn: AtomicLsn::new(lw_lsn.0), cache_block: AtomicU64::new(cache_block), pinned: AtomicU64::new(0), referenced: AtomicBool::new(true), - }) - .expect("out of memory"); - } + }) { + break; + } else { + // The hash map was full. Evict an entry and retry. + } + } + }; + + self.try_evict_block_entry(); } } } @@ -527,7 +578,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { self.relsize_cache.remove(&RelKey::from(rel)); // update with flush LSN - let _ = self.global_lw_lsn.fetch_max(flush_lsn.0, Ordering::Relaxed); + let _ = self + .shared + .global_lw_lsn + .fetch_max(flush_lsn.0, Ordering::Relaxed); // also forget all cached blocks for the relation // FIXME @@ -575,66 +629,144 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // Maintenance routines - /// Evict one block from the file cache. This is used when the file cache fills up - /// Returns the evicted block. It's not put to the free list, so it's available for the - /// caller to use immediately. - pub fn try_evict_one_cache_block(&self) -> Option { - let mut clock_hand = self.clock_hand.lock().unwrap(); - for _ in 0..100 { + /// Evict one block entry from the cache. + /// + /// This is called when the hash map is full, to make an entry available for a new + /// insertion. There's no guarantee that the entry is free by the time this function + /// returns anymore; it can taken by a concurrent thread at any time. So you need to + /// call this and retry repeatedly until you succeed. + fn try_evict_block_entry(&self) { + let num_buckets = self.block_map.get_num_buckets(); + loop { self.clock_iterations_counter.inc(); + let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets; - (*clock_hand) += 1; - - let mut evict_this = false; - let num_buckets = self.block_map.get_num_buckets(); - match self - .block_map - .get_at_bucket((*clock_hand) % num_buckets) - .as_deref() - { + let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() { None => { - // This bucket was unused + // The caller wants to have a free bucket. If there's one already, we're good. + return; } Some((_, blk_entry)) => { - if !blk_entry.referenced.swap(false, Ordering::Relaxed) { - // Evict this. Maybe. - evict_this = true; + // Clear the 'referenced' flag. If it was already clear, + // release the lock (by exiting this scope), and try to + // evict it. + !blk_entry.referenced.swap(false, Ordering::Relaxed) + } + }; + if evict_this { + match self.try_evict_entry(victim_bucket) { + EvictResult::Pinned => { + // keep looping } + EvictResult::Vacant => { + // This was released by someone else. Return so that + // the caller will try to use it. (Chances are that it + // will be reused by someone else, but let's try.) + return; + } + EvictResult::Evicted(None) => { + // This is now free. + return; + } + EvictResult::Evicted(Some(cache_block)) => { + // This is now free. We must not leak the cache block, so put it to the freelist + self.file_cache.as_ref().unwrap().dealloc_block(cache_block); + return; + } + } + } + // TODO: add some kind of a backstop to error out if we loop + // too many times without finding any unpinned entries + } + } + + /// Evict one block from the file cache. This is called when the file cache fills up, + /// to release a cache block. + /// + /// Returns the evicted block. It's not put to the free list, so it's available for + /// the caller to use immediately. + fn try_evict_cache_block(&self) -> Option { + let num_buckets = self.block_map.get_num_buckets(); + let mut iterations = 0; + while iterations < 100 { + self.clock_iterations_counter.inc(); + let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets; + + let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() { + None => { + // This bucket was unused. It's no use for finding a free cache block + continue; + } + Some((_, blk_entry)) => { + // Clear the 'referenced' flag. If it was already clear, + // release the lock (by exiting this scope), and try to + // evict it. + !blk_entry.referenced.swap(false, Ordering::Relaxed) } }; if evict_this { - // grab the write lock - let mut evicted_cache_block = None; - if let Some(e) = self.block_map.entry_at_bucket(*clock_hand % num_buckets) { - let old = e.get(); - // note: all the accesses to 'pinned' currently happen - // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent - // updates. Otherwise, another thread could set the 'pinned' - // flag just after we have checked it here. - if old.pinned.load(Ordering::Relaxed) == 0 { - let _ = self - .global_lw_lsn - .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - let cache_block = - old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); - if cache_block != INVALID_CACHE_BLOCK { - evicted_cache_block = Some(cache_block); - } - e.remove(); + match self.try_evict_entry(victim_bucket) { + EvictResult::Pinned => { + // keep looping + } + EvictResult::Vacant => { + // This was released by someone else. Keep looping. + } + EvictResult::Evicted(None) => { + // This is now free, but it didn't have a cache block + // associated with it. Keep looping. + } + EvictResult::Evicted(Some(cache_block)) => { + // Reuse this + return Some(cache_block); } } - - if evicted_cache_block.is_some() { - self.page_evictions_counter.inc(); - return evicted_cache_block; - } } + + iterations += 1; } - // Give up if we didn't find anything + + // Reached the max iteration count without finding an entry. Return + // to give the caller a chance to do other things None } + /// Returns Err, if the page could not be evicted because it was pinned + fn try_evict_entry(&self, victim: usize) -> EvictResult { + // grab the write lock + if let Some(e) = self.block_map.entry_at_bucket(victim) { + let old = e.get(); + // note: all the accesses to 'pinned' currently happen + // within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent + // updates. Otherwise, another thread could set the 'pinned' + // flag just after we have checked it here. + // + // FIXME: ^^ outdated comment, update_with_fn() is no more + + if old.pinned.load(Ordering::Relaxed) == 0 { + let old_val = e.remove(); + let _ = self + .shared + .global_lw_lsn + .fetch_max(old_val.lw_lsn.into_inner().0, Ordering::Relaxed); + let evicted_cache_block = match old_val.cache_block.into_inner() { + INVALID_CACHE_BLOCK => None, + n => Some(n), + }; + if evicted_cache_block.is_some() { + self.cache_page_evictions_counter.inc(); + } + self.block_entry_evictions_counter.inc(); + EvictResult::Evicted(evicted_cache_block) + } else { + EvictResult::Pinned + } + } else { + EvictResult::Vacant + } + } + /// Resize the local file cache. pub fn resize_file_cache(&self, num_blocks: u32) { let old_num_blocks = self.block_map.get_num_buckets() as u32; @@ -661,7 +793,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { fn desc(&self) -> Vec<&metrics::core::Desc> { let mut descs = Vec::new(); - descs.append(&mut self.page_evictions_counter.desc()); + descs.append(&mut self.cache_page_evictions_counter.desc()); + descs.append(&mut self.block_entry_evictions_counter.desc()); descs.append(&mut self.clock_iterations_counter.desc()); descs.append(&mut self.block_map_num_buckets.desc()); @@ -684,7 +817,8 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { .set(self.relsize_cache.get_num_buckets_in_use() as i64); let mut values = Vec::new(); - values.append(&mut self.page_evictions_counter.collect()); + values.append(&mut self.cache_page_evictions_counter.collect()); + values.append(&mut self.block_entry_evictions_counter.collect()); values.append(&mut self.clock_iterations_counter.collect()); values.append(&mut self.block_map_num_buckets.collect()); @@ -739,11 +873,62 @@ impl<'t> IntegratedCacheReadAccess<'t> { } } - /// Check if the given page is present in the cache - pub fn cache_contains_page(&'t self, rel: &RelTag, block_number: u32) -> bool { - self.block_map - .get(&BlockKey::from((rel, block_number))) - .is_some() + /// Check if LFC contains the given buffer, and update its last-written LSN if not. + /// + /// Returns: + /// true if the block is in the LFC + /// false if it's not. + /// + /// If the block was not in the LFC (i.e. when this returns false), the last-written LSN + /// value on the block is updated to the given 'lsn', so that the next read of the block + /// will read the new version. Otherwise the caller is assumed to modify the page and + /// to update the last-written LSN later by writing the new page. + pub fn update_lw_lsn_for_block_if_not_cached( + &'t self, + rel: &RelTag, + block_number: u32, + lsn: Lsn, + ) -> bool { + let key = BlockKey::from((rel, block_number)); + let entry = self.block_map.entry(key); + match entry { + Entry::Occupied(e) => { + let block_entry = e.get(); + if block_entry.cache_block.load(Ordering::Relaxed) != INVALID_CACHE_BLOCK { + block_entry.referenced.store(true, Ordering::Relaxed); + true + } else { + let old_lwlsn = block_entry.lw_lsn.fetch_max(lsn); + if old_lwlsn >= lsn { + // shouldn't happen + tracing::warn!( + "attempted to move last-written LSN backwards from {old_lwlsn} to {lsn} for rel {rel} blk {block_number}" + ); + } + false + } + } + Entry::Vacant(e) => { + if let Ok(_) = e.insert(BlockEntry { + lw_lsn: AtomicLsn::new(lsn.0), + cache_block: AtomicU64::new(INVALID_CACHE_BLOCK), + pinned: AtomicU64::new(0), + referenced: AtomicBool::new(true), + }) { + false + } else { + // The hash table is full. + // + // TODO: Evict something. But for now, just set the global lw LSN instead. + // That's correct, but not very efficient for future reads + let _ = self + .shared + .global_lw_lsn + .fetch_max(lsn.0, Ordering::Relaxed); + false + } + } + } } pub fn get_bucket(&self, bucket_no: usize) -> GetBucketResult { diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 9622849b5f..b01a02c3fa 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -266,7 +266,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // This needs to be removed once more regression tests are passing. // See also similar hack in the backend code, in wait_request_completion() let result = tokio::time::timeout( - tokio::time::Duration::from_secs(30), + tokio::time::Duration::from_secs(60), self.handle_request(slot.get_request()), ) .await diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 2044c34c3b..1edabc564d 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -397,21 +397,23 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu } /* - * Does the LFC contains the given buffer? + * Check if LFC contains the given buffer, and update its last-written LSN if + * not. * * This is used in WAL replay in read replica, to skip updating pages that are * not in cache. */ bool -communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber blockno) +communicator_new_update_lwlsn_for_block_if_not_cached(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blockno, XLogRecPtr lsn) { - return bcomm_cache_contains(my_bs, - NInfoGetSpcOid(rinfo), - NInfoGetDbOid(rinfo), - NInfoGetRelNumber(rinfo), - forkNum, - blockno); + return bcomm_update_lw_lsn_for_block_if_not_cached(my_bs, + NInfoGetSpcOid(rinfo), + NInfoGetDbOid(rinfo), + NInfoGetRelNumber(rinfo), + forkNum, + blockno, + lsn); } /* Dump a list of blocks in the LFC, for use in prewarming later */ @@ -571,7 +573,7 @@ wait_request_completion(int request_idx, struct NeonIOResult *result_p) * This needs to be removed once more regression tests are passing. */ now = GetCurrentTimestamp(); - if (now - start_time > 60 * 1000 * 1000) + if (now - start_time > 120 * 1000 * 1000) { elog(PANIC, "timed out waiting for response from communicator process at slot %d", request_idx); } diff --git a/pgxn/neon/communicator_new.h b/pgxn/neon/communicator_new.h index c9d56217c6..442e0b03a4 100644 --- a/pgxn/neon/communicator_new.h +++ b/pgxn/neon/communicator_new.h @@ -36,8 +36,8 @@ extern void communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno, BlockNumber nblocks); -extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, - BlockNumber blockno); +extern bool communicator_new_update_lwlsn_for_block_if_not_cached(NRelFileInfo rinfo, ForkNumber forkNum, + BlockNumber blockno, XLogRecPtr lsn); extern int communicator_new_read_slru_segment( SlruKind kind, uint32_t segno, diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 87f4444b39..c961f252f2 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -2671,26 +2671,27 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) } /* - * we don't have the buffer in memory, update lwLsn past this record, also - * evict page from file cache + * We don't have the buffer in shared buffers. Check if it's in the LFC. + * If it's not there either, update the lwLsn past this record. */ if (no_redo_needed) { + bool in_cache; + /* - * Redo changes if page exists in LFC. - * We should perform this check after assigning LwLSN to prevent - * prefetching of some older version of the page by some other backend. + * Redo changes if the page is present in the LFC. */ if (neon_use_communicator_worker) { - no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno); - // FIXME: update lwlsn + in_cache = communicator_new_update_lwlsn_for_block_if_not_cached(rinfo, forknum, blkno, end_recptr); } else { - no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno); + in_cache = lfc_cache_contains(rinfo, forknum, blkno); neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno); } + + no_redo_needed = !in_cache; } LWLockRelease(partitionLock);