diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 415684a6fc..add2ee7f2e 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -64,6 +64,10 @@ pub struct IntegratedCacheWriteAccess<'t> { // Fields for eviction clock_hand: std::sync::Mutex>, + + // Metrics + page_evictions_counter: metrics::IntCounter, + clock_iterations_counter: metrics::IntCounter, } /// Represents read-only access to the integrated cache. Backend processes have this. @@ -108,6 +112,16 @@ impl<'t> IntegratedCacheInitStruct<'t> { global_lw_lsn: AtomicU64::new(lsn.0), file_cache, clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()), + + page_evictions_counter: metrics::IntCounter::new( + "integrated_cache_evictions", + "Page evictions from the Local File Cache", + ).unwrap(), + + clock_iterations_counter: metrics::IntCounter::new( + "clock_iterations", + "Number of times the clock hand has moved", + ).unwrap(), } } @@ -535,6 +549,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut clock_hand = self.clock_hand.lock().unwrap(); for _ in 0..100 { let r = self.cache_tree.start_read(); + + self.clock_iterations_counter.inc(); + match clock_hand.next(&r) { None => { // The cache is completely empty. Pretty unexpected that this function @@ -547,20 +564,30 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } Some((k, TreeEntry::Block(blk_entry))) => { if !blk_entry.referenced.swap(false, Ordering::Relaxed) { - // Evict this + // Evict this. Maybe. let w = self.cache_tree.start_write(); let mut evicted_cache_block = None; w.update_with_fn(&k, |old| { match old { None => UpdateAction::Nothing, - Some(TreeEntry::Rel(_)) => panic!("unexepcted Rel entry"), + Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"), Some(TreeEntry::Block(old)) => { + + // note: all the accesses to 'pinned' currently happen + // within update_with_fn(), which protects from concurrent + // updates. Otherwise, another thread could set the 'pinned' + // flag just after we have checked it here. + if blk_entry.pinned.load(Ordering::Relaxed) { + return UpdateAction::Nothing; + } + let _ = self .global_lw_lsn .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); let cache_block = old.cache_block.load(Ordering::Relaxed); if cache_block != INVALID_CACHE_BLOCK { + self.page_evictions_counter.inc(); evicted_cache_block = Some(cache_block); } // TODO: we don't evict the entry, just the block. Does it make @@ -569,6 +596,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } } }); + if evicted_cache_block.is_some() { + return evicted_cache_block; + } } } } @@ -578,6 +608,21 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } } +impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + descs.append(&mut self.page_evictions_counter.desc()); + descs.append(&mut self.clock_iterations_counter.desc()); + descs + } + fn collect(&self) -> Vec { + let mut values = Vec::new(); + values.append(&mut self.page_evictions_counter.collect()); + values.append(&mut self.clock_iterations_counter.collect()); + values + } +} + /// Read relation size from the cache. /// /// This is in a separate function so that it can be shared by diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 8670f860dd..c98f66ea4d 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -139,7 +139,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { match req { NeonIORequest::Empty => { error!("unexpected Empty IO request"); - NeonIOResult::Error(-1) + NeonIOResult::Error(0) } NeonIORequest::RelExists(req) => { let rel = req.reltag(); @@ -162,7 +162,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(exists) => NeonIOResult::RelExists(exists), Err(err) => { info!("tonic error: {err:?}"); - NeonIOResult::Error(-1) + NeonIOResult::Error(0) } } } @@ -199,7 +199,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } Err(err) => { info!("tonic error: {err:?}"); - NeonIOResult::Error(-1) + NeonIOResult::Error(0) } } } @@ -235,7 +235,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(db_size) => NeonIOResult::DbSize(db_size), Err(err) => { info!("tonic error: {err:?}"); - NeonIOResult::Error(-1) + NeonIOResult::Error(0) } } } @@ -420,6 +420,7 @@ impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { if let Some(file_cache) = &self.cache.file_cache { descs.append(&mut file_cache.desc()); } + descs.append(&mut self.cache.desc()); descs } fn collect(&self) -> Vec { @@ -427,6 +428,7 @@ impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { if let Some(file_cache) = &self.cache.file_cache { values.append(&mut file_cache.collect()); } + values.append(&mut self.cache.collect()); values } } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 12a5e87e7b..38172d2a11 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -314,7 +314,6 @@ get_shard_map(char ***connstrs_p, shardno_t *num_shards_p) { strlcpy(p, shard_map->connstring[i], MAX_PAGESERVER_CONNSTRING_SIZE); connstrs[i] = p; - elog(LOG, "XX: connstrs[%d]: %p", i, p); p += MAX_PAGESERVER_CONNSTRING_SIZE; }