Fix updating last-written LSN when WAL redo skips updating a block

This makes the test_replica_query_race test pass, and probably some
other read replica tests too.
This commit is contained in:
Heikki Linnakangas
2025-07-30 21:20:10 +03:00
parent fca52af7e3
commit af5e3da381
8 changed files with 316 additions and 118 deletions

View File

@@ -310,6 +310,11 @@ impl AtomicLsn {
}
}
/// Consumes the atomic and returns the contained value.
pub const fn into_inner(self) -> Lsn {
Lsn(self.inner.into_inner())
}
/// Atomically retrieve the `Lsn` value from memory.
pub fn load(&self) -> Lsn {
Lsn(self.inner.load(Ordering::Acquire))

View File

@@ -6,9 +6,11 @@ use std::os::fd::OwnedFd;
use crate::backend_comms::NeonIORequestSlot;
use crate::init::CommunicatorInitStruct;
use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess};
use crate::neon_request::{CCachedGetPageVResult, COid};
use crate::neon_request::{CCachedGetPageVResult, CLsn, COid};
use crate::neon_request::{NeonIORequest, NeonIOResult};
use utils::lsn::Lsn;
pub struct CommunicatorBackendStruct<'t> {
my_proc_number: i32,
@@ -174,17 +176,21 @@ pub extern "C" fn bcomm_finish_cache_read(bs: &mut CommunicatorBackendStruct) ->
}
}
/// Check if the local file cache contians the given block
/// Check if LFC contains the given buffer, and update its last-written LSN if not.
///
/// This is used in WAL replay in read replica, to skip updating pages that are
/// not in cache.
#[unsafe(no_mangle)]
pub extern "C" fn bcomm_cache_contains(
pub extern "C" fn bcomm_update_lw_lsn_for_block_if_not_cached(
bs: &mut CommunicatorBackendStruct,
spc_oid: COid,
db_oid: COid,
rel_number: u32,
fork_number: u8,
block_number: u32,
lsn: CLsn,
) -> bool {
bs.integrated_cache.cache_contains_page(
bs.integrated_cache.update_lw_lsn_for_block_if_not_cached(
&pageserver_page_api::RelTag {
spcnode: spc_oid,
dbnode: db_oid,
@@ -192,6 +198,7 @@ pub extern "C" fn bcomm_cache_contains(
forknum: fork_number,
},
block_number,
Lsn(lsn),
)
}

View File

@@ -122,8 +122,6 @@ pub extern "C" fn rcommunicator_shmem_init(
cis
}
// fixme: currently unused
#[allow(dead_code)]
pub fn alloc_from_slice<T>(
area: &mut [MaybeUninit<u8>],
) -> (&mut MaybeUninit<T>, &mut [MaybeUninit<u8>]) {

View File

@@ -23,12 +23,13 @@
//
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use utils::lsn::{AtomicLsn, Lsn};
use crate::file_cache::INVALID_CACHE_BLOCK;
use crate::file_cache::{CacheBlock, FileCache};
use crate::init::alloc_from_slice;
use pageserver_page_api::RelTag;
use metrics::{IntCounter, IntGauge};
@@ -41,25 +42,33 @@ const RELSIZE_CACHE_SIZE: u32 = 64 * 1024;
/// This struct is initialized at postmaster startup, and passed to all the processes via fork().
pub struct IntegratedCacheInitStruct<'t> {
shared: &'t IntegratedCacheShared,
relsize_cache_handle: HashMapInit<'t, RelKey, RelEntry>,
block_map_handle: HashMapInit<'t, BlockKey, BlockEntry>,
}
/// This struct is allocated in the (fixed-size) shared memory area at postmaster startup.
/// It is accessible by all the backends and the communicator process.
#[derive(Debug)]
pub struct IntegratedCacheShared {
global_lw_lsn: AtomicU64,
}
/// Represents write-access to the integrated cache. This is used by the communicator process.
#[derive(Debug)]
pub struct IntegratedCacheWriteAccess<'t> {
shared: &'t IntegratedCacheShared,
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,
global_lw_lsn: AtomicU64,
pub(crate) file_cache: Option<FileCache>,
// Fields for eviction
clock_hand: std::sync::Mutex<usize>,
clock_hand: AtomicUsize,
// Metrics
page_evictions_counter: IntCounter,
cache_page_evictions_counter: IntCounter,
block_entry_evictions_counter: IntCounter,
clock_iterations_counter: IntCounter,
// metrics from the hash map
@@ -72,6 +81,7 @@ pub struct IntegratedCacheWriteAccess<'t> {
/// Represents read-only access to the integrated cache. Backend processes have this.
pub struct IntegratedCacheReadAccess<'t> {
shared: &'t IntegratedCacheShared,
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,
}
@@ -82,7 +92,11 @@ impl<'t> IntegratedCacheInitStruct<'t> {
pub fn shmem_size() -> usize {
// The relsize cache is fixed-size. The block map is allocated in a separate resizable
// area.
HashMapInit::<RelKey, RelEntry>::estimate_size(RELSIZE_CACHE_SIZE)
let mut sz = 0;
sz += std::mem::size_of::<IntegratedCacheShared>();
sz += HashMapInit::<RelKey, RelEntry>::estimate_size(RELSIZE_CACHE_SIZE);
sz
}
/// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which
@@ -92,9 +106,15 @@ impl<'t> IntegratedCacheInitStruct<'t> {
initial_file_cache_size: u64,
max_file_cache_size: u64,
) -> IntegratedCacheInitStruct<'t> {
// Initialize the relsize cache in the fixed-size area
// Initialize the shared struct
let (shared, remain_shmem_area) = alloc_from_slice::<IntegratedCacheShared>(shmem_area);
let shared = shared.write(IntegratedCacheShared {
global_lw_lsn: AtomicU64::new(0),
});
// Use the remaining part of the fixed-size area for the relsize cache
let relsize_cache_handle =
neon_shmem::hash::HashMapInit::with_fixed(RELSIZE_CACHE_SIZE, shmem_area);
neon_shmem::hash::HashMapInit::with_fixed(RELSIZE_CACHE_SIZE, remain_shmem_area);
let max_bytes =
HashMapInit::<BlockKey, BlockEntry>::estimate_size(max_file_cache_size as u32);
@@ -105,6 +125,7 @@ impl<'t> IntegratedCacheInitStruct<'t> {
let block_map_handle =
neon_shmem::hash::HashMapInit::with_shmem(initial_file_cache_size as u32, shmem_handle);
IntegratedCacheInitStruct {
shared,
relsize_cache_handle,
block_map_handle,
}
@@ -117,22 +138,32 @@ impl<'t> IntegratedCacheInitStruct<'t> {
file_cache: Option<FileCache>,
) -> IntegratedCacheWriteAccess<'t> {
let IntegratedCacheInitStruct {
shared,
relsize_cache_handle,
block_map_handle,
} = self;
shared.global_lw_lsn.store(lsn.0, Ordering::Relaxed);
IntegratedCacheWriteAccess {
shared,
relsize_cache: relsize_cache_handle.attach_writer(),
block_map: block_map_handle.attach_writer(),
global_lw_lsn: AtomicU64::new(lsn.0),
file_cache,
clock_hand: std::sync::Mutex::new(0),
clock_hand: AtomicUsize::new(0),
page_evictions_counter: metrics::IntCounter::new(
"integrated_cache_evictions",
cache_page_evictions_counter: metrics::IntCounter::new(
"integrated_cache_page_evictions",
"Page evictions from the Local File Cache",
)
.unwrap(),
block_entry_evictions_counter: metrics::IntCounter::new(
"integrated_cache_block_entry_evictions",
"Block entry evictions from the integrated cache",
)
.unwrap(),
clock_iterations_counter: metrics::IntCounter::new(
"clock_iterations",
"Number of times the clock hand has moved",
@@ -166,11 +197,13 @@ impl<'t> IntegratedCacheInitStruct<'t> {
/// Initialize access to the integrated cache for a backend process
pub fn backend_init(self) -> IntegratedCacheReadAccess<'t> {
let IntegratedCacheInitStruct {
shared,
relsize_cache_handle,
block_map_handle,
} = self;
IntegratedCacheReadAccess {
shared,
relsize_cache: relsize_cache_handle.attach_reader(),
block_map: block_map_handle.attach_reader(),
}
@@ -253,12 +286,25 @@ pub enum CacheResult<V> {
NotFound(Lsn),
}
/// Return type of [try_evict_entry]
enum EvictResult {
/// Could not evict page because it was pinned
Pinned,
/// The victim bucket was already vacant
Vacant,
/// Evicted an entry. If it had a cache block associated with it, it's returned
/// here, otherwise None
Evicted(Option<CacheBlock>),
}
impl<'t> IntegratedCacheWriteAccess<'t> {
pub fn get_rel_size(&'t self, rel: &RelTag) -> CacheResult<u32> {
if let Some(nblocks) = get_rel_size(&self.relsize_cache, rel) {
CacheResult::Found(nblocks)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed));
CacheResult::NotFound(lsn)
}
}
@@ -283,7 +329,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
return Ok(CacheResult::NotFound(block_entry.lw_lsn.load()));
}
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed));
return Ok(CacheResult::NotFound(lsn));
};
@@ -316,7 +362,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
Ok(CacheResult::NotFound(block_entry.lw_lsn.load()))
}
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed));
Ok(CacheResult::NotFound(lsn))
}
}
@@ -328,7 +374,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if let Some(_rel_entry) = self.relsize_cache.get(&RelKey::from(rel)) {
CacheResult::Found(true)
} else {
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed));
CacheResult::NotFound(lsn)
}
}
@@ -339,7 +385,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// e.g. psql \l+ command, so the user will feel the latency.
// fixme: is this right lsn?
let lsn = Lsn(self.global_lw_lsn.load(Ordering::Relaxed));
let lsn = Lsn(self.shared.global_lw_lsn.load(Ordering::Relaxed));
CacheResult::NotFound(lsn)
}
@@ -416,7 +462,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if let Some(x) = file_cache.alloc_block() {
break x;
}
if let Some(x) = self.try_evict_one_cache_block() {
if let Some(x) = self.try_evict_cache_block() {
break x;
}
}
@@ -431,7 +477,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// FIXME: unpin the block entry on error
// Update the block entry
let entry = self.block_map.entry(key);
let entry = self.block_map.entry(key.clone());
assert_eq!(found_existing, matches!(entry, Entry::Occupied(_)));
match entry {
Entry::Occupied(e) => {
@@ -478,7 +524,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
if let Some(x) = file_cache.alloc_block() {
break x;
}
if let Some(x) = self.try_evict_one_cache_block() {
if let Some(x) = self.try_evict_cache_block() {
break x;
}
}
@@ -491,32 +537,37 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
.expect("error writing to cache");
// FIXME: handle errors gracefully.
match self.block_map.entry(key) {
Entry::Occupied(e) => {
let block_entry = e.get();
// FIXME: could there be concurrent readers?
assert!(block_entry.pinned.load(Ordering::Relaxed) == 0);
loop {
match self.block_map.entry(key.clone()) {
Entry::Occupied(e) => {
let block_entry = e.get();
// FIXME: could there be concurrent readers?
assert!(block_entry.pinned.load(Ordering::Relaxed) == 0);
let old_cache_block =
block_entry.cache_block.swap(cache_block, Ordering::Relaxed);
if old_cache_block != INVALID_CACHE_BLOCK {
panic!(
"remember_page called in !is_write mode, but page is already cached at blk {old_cache_block}"
);
let old_cache_block =
block_entry.cache_block.swap(cache_block, Ordering::Relaxed);
if old_cache_block != INVALID_CACHE_BLOCK {
panic!(
"remember_page called in !is_write mode, but page is already cached at blk {old_cache_block}"
);
}
break;
}
}
Entry::Vacant(e) => {
// FIXME: what to do if we run out of memory? Evict other relation entries? Remove
// block entries first?
_ = e
.insert(BlockEntry {
Entry::Vacant(e) => {
if let Ok(_) = e.insert(BlockEntry {
lw_lsn: AtomicLsn::new(lw_lsn.0),
cache_block: AtomicU64::new(cache_block),
pinned: AtomicU64::new(0),
referenced: AtomicBool::new(true),
})
.expect("out of memory");
}
}) {
break;
} else {
// The hash map was full. Evict an entry and retry.
}
}
};
self.try_evict_block_entry();
}
}
}
@@ -527,7 +578,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
self.relsize_cache.remove(&RelKey::from(rel));
// update with flush LSN
let _ = self.global_lw_lsn.fetch_max(flush_lsn.0, Ordering::Relaxed);
let _ = self
.shared
.global_lw_lsn
.fetch_max(flush_lsn.0, Ordering::Relaxed);
// also forget all cached blocks for the relation
// FIXME
@@ -575,66 +629,144 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
// Maintenance routines
/// Evict one block from the file cache. This is used when the file cache fills up
/// Returns the evicted block. It's not put to the free list, so it's available for the
/// caller to use immediately.
pub fn try_evict_one_cache_block(&self) -> Option<CacheBlock> {
let mut clock_hand = self.clock_hand.lock().unwrap();
for _ in 0..100 {
/// Evict one block entry from the cache.
///
/// This is called when the hash map is full, to make an entry available for a new
/// insertion. There's no guarantee that the entry is free by the time this function
/// returns anymore; it can taken by a concurrent thread at any time. So you need to
/// call this and retry repeatedly until you succeed.
fn try_evict_block_entry(&self) {
let num_buckets = self.block_map.get_num_buckets();
loop {
self.clock_iterations_counter.inc();
let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets;
(*clock_hand) += 1;
let mut evict_this = false;
let num_buckets = self.block_map.get_num_buckets();
match self
.block_map
.get_at_bucket((*clock_hand) % num_buckets)
.as_deref()
{
let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() {
None => {
// This bucket was unused
// The caller wants to have a free bucket. If there's one already, we're good.
return;
}
Some((_, blk_entry)) => {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this. Maybe.
evict_this = true;
// Clear the 'referenced' flag. If it was already clear,
// release the lock (by exiting this scope), and try to
// evict it.
!blk_entry.referenced.swap(false, Ordering::Relaxed)
}
};
if evict_this {
match self.try_evict_entry(victim_bucket) {
EvictResult::Pinned => {
// keep looping
}
EvictResult::Vacant => {
// This was released by someone else. Return so that
// the caller will try to use it. (Chances are that it
// will be reused by someone else, but let's try.)
return;
}
EvictResult::Evicted(None) => {
// This is now free.
return;
}
EvictResult::Evicted(Some(cache_block)) => {
// This is now free. We must not leak the cache block, so put it to the freelist
self.file_cache.as_ref().unwrap().dealloc_block(cache_block);
return;
}
}
}
// TODO: add some kind of a backstop to error out if we loop
// too many times without finding any unpinned entries
}
}
/// Evict one block from the file cache. This is called when the file cache fills up,
/// to release a cache block.
///
/// Returns the evicted block. It's not put to the free list, so it's available for
/// the caller to use immediately.
fn try_evict_cache_block(&self) -> Option<CacheBlock> {
let num_buckets = self.block_map.get_num_buckets();
let mut iterations = 0;
while iterations < 100 {
self.clock_iterations_counter.inc();
let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets;
let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() {
None => {
// This bucket was unused. It's no use for finding a free cache block
continue;
}
Some((_, blk_entry)) => {
// Clear the 'referenced' flag. If it was already clear,
// release the lock (by exiting this scope), and try to
// evict it.
!blk_entry.referenced.swap(false, Ordering::Relaxed)
}
};
if evict_this {
// grab the write lock
let mut evicted_cache_block = None;
if let Some(e) = self.block_map.entry_at_bucket(*clock_hand % num_buckets) {
let old = e.get();
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if old.pinned.load(Ordering::Relaxed) == 0 {
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block =
old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
evicted_cache_block = Some(cache_block);
}
e.remove();
match self.try_evict_entry(victim_bucket) {
EvictResult::Pinned => {
// keep looping
}
EvictResult::Vacant => {
// This was released by someone else. Keep looping.
}
EvictResult::Evicted(None) => {
// This is now free, but it didn't have a cache block
// associated with it. Keep looping.
}
EvictResult::Evicted(Some(cache_block)) => {
// Reuse this
return Some(cache_block);
}
}
if evicted_cache_block.is_some() {
self.page_evictions_counter.inc();
return evicted_cache_block;
}
}
iterations += 1;
}
// Give up if we didn't find anything
// Reached the max iteration count without finding an entry. Return
// to give the caller a chance to do other things
None
}
/// Returns Err, if the page could not be evicted because it was pinned
fn try_evict_entry(&self, victim: usize) -> EvictResult {
// grab the write lock
if let Some(e) = self.block_map.entry_at_bucket(victim) {
let old = e.get();
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), or while holding ValueReadGuard, which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
//
// FIXME: ^^ outdated comment, update_with_fn() is no more
if old.pinned.load(Ordering::Relaxed) == 0 {
let old_val = e.remove();
let _ = self
.shared
.global_lw_lsn
.fetch_max(old_val.lw_lsn.into_inner().0, Ordering::Relaxed);
let evicted_cache_block = match old_val.cache_block.into_inner() {
INVALID_CACHE_BLOCK => None,
n => Some(n),
};
if evicted_cache_block.is_some() {
self.cache_page_evictions_counter.inc();
}
self.block_entry_evictions_counter.inc();
EvictResult::Evicted(evicted_cache_block)
} else {
EvictResult::Pinned
}
} else {
EvictResult::Vacant
}
}
/// Resize the local file cache.
pub fn resize_file_cache(&self, num_blocks: u32) {
let old_num_blocks = self.block_map.get_num_buckets() as u32;
@@ -661,7 +793,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.page_evictions_counter.desc());
descs.append(&mut self.cache_page_evictions_counter.desc());
descs.append(&mut self.block_entry_evictions_counter.desc());
descs.append(&mut self.clock_iterations_counter.desc());
descs.append(&mut self.block_map_num_buckets.desc());
@@ -684,7 +817,8 @@ impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
.set(self.relsize_cache.get_num_buckets_in_use() as i64);
let mut values = Vec::new();
values.append(&mut self.page_evictions_counter.collect());
values.append(&mut self.cache_page_evictions_counter.collect());
values.append(&mut self.block_entry_evictions_counter.collect());
values.append(&mut self.clock_iterations_counter.collect());
values.append(&mut self.block_map_num_buckets.collect());
@@ -739,11 +873,62 @@ impl<'t> IntegratedCacheReadAccess<'t> {
}
}
/// Check if the given page is present in the cache
pub fn cache_contains_page(&'t self, rel: &RelTag, block_number: u32) -> bool {
self.block_map
.get(&BlockKey::from((rel, block_number)))
.is_some()
/// Check if LFC contains the given buffer, and update its last-written LSN if not.
///
/// Returns:
/// true if the block is in the LFC
/// false if it's not.
///
/// If the block was not in the LFC (i.e. when this returns false), the last-written LSN
/// value on the block is updated to the given 'lsn', so that the next read of the block
/// will read the new version. Otherwise the caller is assumed to modify the page and
/// to update the last-written LSN later by writing the new page.
pub fn update_lw_lsn_for_block_if_not_cached(
&'t self,
rel: &RelTag,
block_number: u32,
lsn: Lsn,
) -> bool {
let key = BlockKey::from((rel, block_number));
let entry = self.block_map.entry(key);
match entry {
Entry::Occupied(e) => {
let block_entry = e.get();
if block_entry.cache_block.load(Ordering::Relaxed) != INVALID_CACHE_BLOCK {
block_entry.referenced.store(true, Ordering::Relaxed);
true
} else {
let old_lwlsn = block_entry.lw_lsn.fetch_max(lsn);
if old_lwlsn >= lsn {
// shouldn't happen
tracing::warn!(
"attempted to move last-written LSN backwards from {old_lwlsn} to {lsn} for rel {rel} blk {block_number}"
);
}
false
}
}
Entry::Vacant(e) => {
if let Ok(_) = e.insert(BlockEntry {
lw_lsn: AtomicLsn::new(lsn.0),
cache_block: AtomicU64::new(INVALID_CACHE_BLOCK),
pinned: AtomicU64::new(0),
referenced: AtomicBool::new(true),
}) {
false
} else {
// The hash table is full.
//
// TODO: Evict something. But for now, just set the global lw LSN instead.
// That's correct, but not very efficient for future reads
let _ = self
.shared
.global_lw_lsn
.fetch_max(lsn.0, Ordering::Relaxed);
false
}
}
}
}
pub fn get_bucket(&self, bucket_no: usize) -> GetBucketResult {

View File

@@ -266,7 +266,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// This needs to be removed once more regression tests are passing.
// See also similar hack in the backend code, in wait_request_completion()
let result = tokio::time::timeout(
tokio::time::Duration::from_secs(30),
tokio::time::Duration::from_secs(60),
self.handle_request(slot.get_request()),
)
.await

View File

@@ -397,21 +397,23 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu
}
/*
* Does the LFC contains the given buffer?
* Check if LFC contains the given buffer, and update its last-written LSN if
* not.
*
* This is used in WAL replay in read replica, to skip updating pages that are
* not in cache.
*/
bool
communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno)
communicator_new_update_lwlsn_for_block_if_not_cached(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno, XLogRecPtr lsn)
{
return bcomm_cache_contains(my_bs,
NInfoGetSpcOid(rinfo),
NInfoGetDbOid(rinfo),
NInfoGetRelNumber(rinfo),
forkNum,
blockno);
return bcomm_update_lw_lsn_for_block_if_not_cached(my_bs,
NInfoGetSpcOid(rinfo),
NInfoGetDbOid(rinfo),
NInfoGetRelNumber(rinfo),
forkNum,
blockno,
lsn);
}
/* Dump a list of blocks in the LFC, for use in prewarming later */
@@ -571,7 +573,7 @@ wait_request_completion(int request_idx, struct NeonIOResult *result_p)
* This needs to be removed once more regression tests are passing.
*/
now = GetCurrentTimestamp();
if (now - start_time > 60 * 1000 * 1000)
if (now - start_time > 120 * 1000 * 1000)
{
elog(PANIC, "timed out waiting for response from communicator process at slot %d", request_idx);
}

View File

@@ -36,8 +36,8 @@ extern void communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum
extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno,
BlockNumber nblocks);
extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno);
extern bool communicator_new_update_lwlsn_for_block_if_not_cached(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno, XLogRecPtr lsn);
extern int communicator_new_read_slru_segment(
SlruKind kind,
uint32_t segno,

View File

@@ -2671,26 +2671,27 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
* We don't have the buffer in shared buffers. Check if it's in the LFC.
* If it's not there either, update the lwLsn past this record.
*/
if (no_redo_needed)
{
bool in_cache;
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
* Redo changes if the page is present in the LFC.
*/
if (neon_use_communicator_worker)
{
no_redo_needed = communicator_new_cache_contains(rinfo, forknum, blkno);
// FIXME: update lwlsn
in_cache = communicator_new_update_lwlsn_for_block_if_not_cached(rinfo, forknum, blkno, end_recptr);
}
else
{
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
in_cache = lfc_cache_contains(rinfo, forknum, blkno);
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
}
no_redo_needed = !in_cache;
}
LWLockRelease(partitionLock);