Add a few metrics, fix page eviction

This commit is contained in:
Heikki Linnakangas
2025-05-10 03:13:28 +03:00
parent e6a4171fa1
commit 0efefbf77c
3 changed files with 53 additions and 7 deletions

View File

@@ -64,6 +64,10 @@ pub struct IntegratedCacheWriteAccess<'t> {
// Fields for eviction
clock_hand: std::sync::Mutex<TreeIterator<TreeKey>>,
// Metrics
page_evictions_counter: metrics::IntCounter,
clock_iterations_counter: metrics::IntCounter,
}
/// Represents read-only access to the integrated cache. Backend processes have this.
@@ -108,6 +112,16 @@ impl<'t> IntegratedCacheInitStruct<'t> {
global_lw_lsn: AtomicU64::new(lsn.0),
file_cache,
clock_hand: std::sync::Mutex::new(TreeIterator::new_wrapping()),
page_evictions_counter: metrics::IntCounter::new(
"integrated_cache_evictions",
"Page evictions from the Local File Cache",
).unwrap(),
clock_iterations_counter: metrics::IntCounter::new(
"clock_iterations",
"Number of times the clock hand has moved",
).unwrap(),
}
}
@@ -535,6 +549,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
let mut clock_hand = self.clock_hand.lock().unwrap();
for _ in 0..100 {
let r = self.cache_tree.start_read();
self.clock_iterations_counter.inc();
match clock_hand.next(&r) {
None => {
// The cache is completely empty. Pretty unexpected that this function
@@ -547,20 +564,30 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
Some((k, TreeEntry::Block(blk_entry))) => {
if !blk_entry.referenced.swap(false, Ordering::Relaxed) {
// Evict this
// Evict this. Maybe.
let w = self.cache_tree.start_write();
let mut evicted_cache_block = None;
w.update_with_fn(&k, |old| {
match old {
None => UpdateAction::Nothing,
Some(TreeEntry::Rel(_)) => panic!("unexepcted Rel entry"),
Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"),
Some(TreeEntry::Block(old)) => {
// note: all the accesses to 'pinned' currently happen
// within update_with_fn(), which protects from concurrent
// updates. Otherwise, another thread could set the 'pinned'
// flag just after we have checked it here.
if blk_entry.pinned.load(Ordering::Relaxed) {
return UpdateAction::Nothing;
}
let _ = self
.global_lw_lsn
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old.cache_block.load(Ordering::Relaxed);
if cache_block != INVALID_CACHE_BLOCK {
self.page_evictions_counter.inc();
evicted_cache_block = Some(cache_block);
}
// TODO: we don't evict the entry, just the block. Does it make
@@ -569,6 +596,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
}
});
if evicted_cache_block.is_some() {
return evicted_cache_block;
}
}
}
}
@@ -578,6 +608,21 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
}
impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.page_evictions_counter.desc());
descs.append(&mut self.clock_iterations_counter.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
values.append(&mut self.page_evictions_counter.collect());
values.append(&mut self.clock_iterations_counter.collect());
values
}
}
/// Read relation size from the cache.
///
/// This is in a separate function so that it can be shared by

View File

@@ -139,7 +139,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
match req {
NeonIORequest::Empty => {
error!("unexpected Empty IO request");
NeonIOResult::Error(-1)
NeonIOResult::Error(0)
}
NeonIORequest::RelExists(req) => {
let rel = req.reltag();
@@ -162,7 +162,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Ok(exists) => NeonIOResult::RelExists(exists),
Err(err) => {
info!("tonic error: {err:?}");
NeonIOResult::Error(-1)
NeonIOResult::Error(0)
}
}
}
@@ -199,7 +199,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
Err(err) => {
info!("tonic error: {err:?}");
NeonIOResult::Error(-1)
NeonIOResult::Error(0)
}
}
}
@@ -235,7 +235,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Ok(db_size) => NeonIOResult::DbSize(db_size),
Err(err) => {
info!("tonic error: {err:?}");
NeonIOResult::Error(-1)
NeonIOResult::Error(0)
}
}
}
@@ -420,6 +420,7 @@ impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> {
if let Some(file_cache) = &self.cache.file_cache {
descs.append(&mut file_cache.desc());
}
descs.append(&mut self.cache.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
@@ -427,6 +428,7 @@ impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> {
if let Some(file_cache) = &self.cache.file_cache {
values.append(&mut file_cache.collect());
}
values.append(&mut self.cache.collect());
values
}
}

View File

@@ -314,7 +314,6 @@ get_shard_map(char ***connstrs_p, shardno_t *num_shards_p)
{
strlcpy(p, shard_map->connstring[i], MAX_PAGESERVER_CONNSTRING_SIZE);
connstrs[i] = p;
elog(LOG, "XX: connstrs[%d]: %p", i, p);
p += MAX_PAGESERVER_CONNSTRING_SIZE;
}