Add a bunch of metric counters

This commit is contained in:
Heikki Linnakangas
2025-05-11 20:11:13 +03:00
parent 319cd74f73
commit 86671e3a0b
2 changed files with 132 additions and 8 deletions

View File

@@ -284,6 +284,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
.unwrap()
.read_block(cache_block, dst)
.await?;
Ok(CacheResult::Found(()))
} else {
Ok(CacheResult::NotFound(block_entry.lw_lsn.load()))

View File

@@ -12,6 +12,8 @@ use crate::worker_process::in_progress_ios::{RequestInProgressTable, RequestInPr
use pageserver_client_grpc::PageserverClient;
use pageserver_page_api::model;
use metrics::{IntCounterVec, IntCounter};
use tokio::io::AsyncReadExt;
use tokio_pipe::PipeRead;
use uring_common::buf::IoBuf;
@@ -34,6 +36,28 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
next_request_id: AtomicU64,
in_progress_table: RequestInProgressTable,
// Metrics
request_counters: IntCounterVec,
request_rel_exists_counter: IntCounter,
request_rel_size_counter: IntCounter,
request_get_pagev_counter: IntCounter,
request_prefetchv_counter: IntCounter,
request_db_size_counter: IntCounter,
request_write_page_counter: IntCounter,
request_rel_extend_counter: IntCounter,
request_rel_zero_extend_counter: IntCounter,
request_rel_create_counter: IntCounter,
request_rel_truncate_counter: IntCounter,
request_rel_unlink_counter: IntCounter,
getpage_cache_misses_counter: IntCounter,
getpage_cache_hits_counter: IntCounter,
request_nblocks_counters: IntCounterVec,
request_get_pagev_nblocks_counter: IntCounter,
request_prefetchv_nblocks_counter: IntCounter,
request_rel_zero_extend_nblocks_counter: IntCounter,
}
@@ -65,16 +89,68 @@ pub(super) async fn init(
let pageserver_client = PageserverClient::new(&tenant_id, &timeline_id, &auth_token, shard_map);
let this = CommunicatorWorkerProcessStruct {
let request_counters = IntCounterVec::new(
metrics::core::Opts::new("backend_requests_total", "Number of requests from backends."),
&["request_kind"],
).unwrap();
let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]);
let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]);
let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]);
let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]);
let request_db_size_counter = request_counters.with_label_values(&["db_size"]);
let request_write_page_counter = request_counters.with_label_values(&["write_page"]);
let request_rel_extend_counter = request_counters.with_label_values(&["rel_extend"]);
let request_rel_zero_extend_counter = request_counters.with_label_values(&["rel_zero_extend"]);
let request_rel_create_counter = request_counters.with_label_values(&["rel_create"]);
let request_rel_truncate_counter = request_counters.with_label_values(&["rel_truncate"]);
let request_rel_unlink_counter = request_counters.with_label_values(&["rel_unlink"]);
let getpage_cache_misses_counter = IntCounter::new(
"getpage_cache_misses", "Number of file cache misses in get_pagev requests."
).unwrap();
let getpage_cache_hits_counter = IntCounter::new(
"getpage_cache_hits", "Number of file cache hits in get_pagev requests."
).unwrap();
// For the requests that affect multiple blocks, have separate counters for the # of blocks affected
let request_nblocks_counters = IntCounterVec::new(
metrics::core::Opts::new("request_nblocks_total", "Number of blocks in backend requests."),
&["request_kind"],
).unwrap();
let request_get_pagev_nblocks_counter = request_nblocks_counters.with_label_values(&["get_pagev"]);
let request_prefetchv_nblocks_counter = request_nblocks_counters.with_label_values(&["prefetchv"]);
let request_rel_zero_extend_nblocks_counter = request_nblocks_counters.with_label_values(&["rel_zero_extend"]);
CommunicatorWorkerProcessStruct {
neon_request_slots: cis.neon_request_slots,
pageserver_client,
cache,
submission_pipe_read_raw_fd: cis.submission_pipe_read_fd,
next_request_id: AtomicU64::new(1),
in_progress_table: RequestInProgressTable::new(),
};
this
// metrics
request_counters,
request_rel_exists_counter,
request_rel_size_counter,
request_get_pagev_counter,
request_prefetchv_counter,
request_db_size_counter,
request_write_page_counter,
request_rel_extend_counter,
request_rel_zero_extend_counter,
request_rel_create_counter,
request_rel_truncate_counter,
request_rel_unlink_counter,
getpage_cache_misses_counter,
getpage_cache_hits_counter,
request_nblocks_counters,
request_get_pagev_nblocks_counter,
request_prefetchv_nblocks_counter,
request_rel_zero_extend_nblocks_counter,
}
}
impl<'t> CommunicatorWorkerProcessStruct<'t> {
@@ -142,6 +218,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIOResult::Error(0)
}
NeonIORequest::RelExists(req) => {
self.request_rel_exists_counter.inc();
let rel = req.reltag();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone()));
@@ -168,6 +245,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
NeonIORequest::RelSize(req) => {
self.request_rel_size_counter.inc();
let rel = req.reltag();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone()));
@@ -203,16 +281,23 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
}
NeonIORequest::GetPageV(req) => match self.handle_get_pagev_request(req).await {
Ok(()) => NeonIOResult::GetPageV,
Err(errno) => NeonIOResult::Error(errno),
NeonIORequest::GetPageV(req) => {
self.request_get_pagev_counter.inc();
self.request_get_pagev_nblocks_counter.inc_by(req.nblocks as u64);
match self.handle_get_pagev_request(req).await {
Ok(()) => NeonIOResult::GetPageV,
Err(errno) => NeonIOResult::Error(errno),
}
},
NeonIORequest::PrefetchV(req) => {
self.request_prefetchv_counter.inc();
self.request_prefetchv_nblocks_counter.inc_by(req.nblocks as u64);
let req = req.clone();
tokio::spawn(async move { self.handle_prefetchv_request(&req).await });
NeonIOResult::PrefetchVLaunched
}
NeonIORequest::DbSize(req) => {
self.request_db_size_counter.inc();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Db(req.db_oid));
// Check the cache first
@@ -242,6 +327,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// Write requests
NeonIORequest::WritePage(req) => {
self.request_write_page_counter.inc();
// Also store it in the LFC while we still have it
let rel = req.reltag();
let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), req.block_number));
@@ -251,28 +338,39 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIOResult::WriteOK
}
NeonIORequest::RelExtend(req) => {
self.request_rel_extend_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache
.remember_rel_size(&req.reltag(), req.block_number + 1);
NeonIOResult::WriteOK
}
NeonIORequest::RelZeroExtend(req) => {
self.request_rel_zero_extend_counter.inc();
self.request_rel_zero_extend_nblocks_counter.inc_by(req.nblocks as u64);
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache
.remember_rel_size(&req.reltag(), req.block_number + req.nblocks);
NeonIOResult::WriteOK
}
NeonIORequest::RelCreate(req) => {
self.request_rel_create_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.remember_rel_size(&req.reltag(), 0);
NeonIOResult::WriteOK
}
NeonIORequest::RelTruncate(req) => {
self.request_rel_truncate_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.remember_rel_size(&req.reltag(), req.nblocks);
NeonIOResult::WriteOK
}
NeonIORequest::RelUnlink(req) => {
self.request_rel_unlink_counter.inc();
// TODO: need to grab an io-in-progress lock for this? I guess not
self.cache.forget_rel(&req.reltag());
NeonIOResult::WriteOK
@@ -284,6 +382,14 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
// Check the cache first
//
// Note: Because the backends perform a direct lookup in the cache before sending
// the request to the communicator process, we expect the pages to almost never
// be already in cache. It could happen when:
// 1. two backends try to read the same page at the same time, but that should never
// happen because there's higher level locking in the Postgres buffer manager, or
// 2. if a prefetch request finished at the same time as a backend requested the
// page. That's much more likely.
let mut cache_misses = Vec::with_capacity(req.nblocks as usize);
for i in 0..req.nblocks {
let blkno = req.block_number + i as u32;
@@ -296,7 +402,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let not_modified_since = match self.cache.get_page(&rel, blkno, dest).await {
Ok(CacheResult::Found(_)) => {
// get_page already copied the block content to the destination
trace!("found blk {} in rel {:?} in LFC ", blkno, rel);
trace!("found blk {} in rel {:?} in LFC", blkno, rel);
continue;
}
Ok(CacheResult::NotFound(lsn)) => lsn,
@@ -304,6 +410,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
};
cache_misses.push((blkno, not_modified_since, dest, in_progress_guard));
}
self.getpage_cache_misses_counter.inc_by(cache_misses.len() as u64);
self.getpage_cache_hits_counter.inc_by(req.nblocks as u64 - cache_misses.len() as u64);
if cache_misses.is_empty() {
return Ok(());
}
@@ -366,7 +475,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await {
Ok(CacheResult::Found(_)) => {
trace!("found blk {} in rel {:?} in LFC ", blkno, rel);
trace!("found blk {} in rel {:?} in LFC", blkno, rel);
continue;
}
Ok(CacheResult::NotFound(lsn)) => lsn,
@@ -417,18 +526,32 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.request_counters.desc());
descs.append(&mut self.getpage_cache_misses_counter.desc());
descs.append(&mut self.getpage_cache_hits_counter.desc());
descs.append(&mut self.request_nblocks_counters.desc());
if let Some(file_cache) = &self.cache.file_cache {
descs.append(&mut file_cache.desc());
}
descs.append(&mut self.cache.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
values.append(&mut self.request_counters.collect());
values.append(&mut self.getpage_cache_misses_counter.collect());
values.append(&mut self.getpage_cache_hits_counter.collect());
values.append(&mut self.request_nblocks_counters.collect());
if let Some(file_cache) = &self.cache.file_cache {
values.append(&mut file_cache.collect());
}
values.append(&mut self.cache.collect());
values
}
}