From 86671e3a0be3f3263bca2c94ba1ef7dddc68e6f0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sun, 11 May 2025 20:11:13 +0300 Subject: [PATCH] Add a bunch of metric counters --- .../neon/communicator/src/integrated_cache.rs | 1 + .../src/worker_process/main_loop.rs | 139 +++++++++++++++++- 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index f08a1793fd..1b86262993 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -284,6 +284,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { .unwrap() .read_block(cache_block, dst) .await?; + Ok(CacheResult::Found(())) } else { Ok(CacheResult::NotFound(block_entry.lw_lsn.load())) diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index c98f66ea4d..91bcf4d46d 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -12,6 +12,8 @@ use crate::worker_process::in_progress_ios::{RequestInProgressTable, RequestInPr use pageserver_client_grpc::PageserverClient; use pageserver_page_api::model; +use metrics::{IntCounterVec, IntCounter}; + use tokio::io::AsyncReadExt; use tokio_pipe::PipeRead; use uring_common::buf::IoBuf; @@ -34,6 +36,28 @@ pub struct CommunicatorWorkerProcessStruct<'a> { next_request_id: AtomicU64, in_progress_table: RequestInProgressTable, + + // Metrics + request_counters: IntCounterVec, + request_rel_exists_counter: IntCounter, + request_rel_size_counter: IntCounter, + request_get_pagev_counter: IntCounter, + request_prefetchv_counter: IntCounter, + request_db_size_counter: IntCounter, + request_write_page_counter: IntCounter, + request_rel_extend_counter: IntCounter, + request_rel_zero_extend_counter: IntCounter, + request_rel_create_counter: IntCounter, + request_rel_truncate_counter: IntCounter, + request_rel_unlink_counter: IntCounter, + + getpage_cache_misses_counter: IntCounter, + getpage_cache_hits_counter: IntCounter, + + request_nblocks_counters: IntCounterVec, + request_get_pagev_nblocks_counter: IntCounter, + request_prefetchv_nblocks_counter: IntCounter, + request_rel_zero_extend_nblocks_counter: IntCounter, } @@ -65,16 +89,68 @@ pub(super) async fn init( let pageserver_client = PageserverClient::new(&tenant_id, &timeline_id, &auth_token, shard_map); - let this = CommunicatorWorkerProcessStruct { + let request_counters = IntCounterVec::new( + metrics::core::Opts::new("backend_requests_total", "Number of requests from backends."), + &["request_kind"], + ).unwrap(); + let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]); + let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]); + let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]); + let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]); + let request_db_size_counter = request_counters.with_label_values(&["db_size"]); + let request_write_page_counter = request_counters.with_label_values(&["write_page"]); + let request_rel_extend_counter = request_counters.with_label_values(&["rel_extend"]); + let request_rel_zero_extend_counter = request_counters.with_label_values(&["rel_zero_extend"]); + let request_rel_create_counter = request_counters.with_label_values(&["rel_create"]); + let request_rel_truncate_counter = request_counters.with_label_values(&["rel_truncate"]); + let request_rel_unlink_counter = request_counters.with_label_values(&["rel_unlink"]); + + let getpage_cache_misses_counter = IntCounter::new( + "getpage_cache_misses", "Number of file cache misses in get_pagev requests." + ).unwrap(); + let getpage_cache_hits_counter = IntCounter::new( + "getpage_cache_hits", "Number of file cache hits in get_pagev requests." + ).unwrap(); + + // For the requests that affect multiple blocks, have separate counters for the # of blocks affected + let request_nblocks_counters = IntCounterVec::new( + metrics::core::Opts::new("request_nblocks_total", "Number of blocks in backend requests."), + &["request_kind"], + ).unwrap(); + let request_get_pagev_nblocks_counter = request_nblocks_counters.with_label_values(&["get_pagev"]); + let request_prefetchv_nblocks_counter = request_nblocks_counters.with_label_values(&["prefetchv"]); + let request_rel_zero_extend_nblocks_counter = request_nblocks_counters.with_label_values(&["rel_zero_extend"]); + + CommunicatorWorkerProcessStruct { neon_request_slots: cis.neon_request_slots, pageserver_client, cache, submission_pipe_read_raw_fd: cis.submission_pipe_read_fd, next_request_id: AtomicU64::new(1), in_progress_table: RequestInProgressTable::new(), - }; - this + // metrics + request_counters, + request_rel_exists_counter, + request_rel_size_counter, + request_get_pagev_counter, + request_prefetchv_counter, + request_db_size_counter, + request_write_page_counter, + request_rel_extend_counter, + request_rel_zero_extend_counter, + request_rel_create_counter, + request_rel_truncate_counter, + request_rel_unlink_counter, + + getpage_cache_misses_counter, + getpage_cache_hits_counter, + + request_nblocks_counters, + request_get_pagev_nblocks_counter, + request_prefetchv_nblocks_counter, + request_rel_zero_extend_nblocks_counter, + } } impl<'t> CommunicatorWorkerProcessStruct<'t> { @@ -142,6 +218,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIOResult::Error(0) } NeonIORequest::RelExists(req) => { + self.request_rel_exists_counter.inc(); let rel = req.reltag(); let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone())); @@ -168,6 +245,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } NeonIORequest::RelSize(req) => { + self.request_rel_size_counter.inc(); let rel = req.reltag(); let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Rel(rel.clone())); @@ -203,16 +281,23 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } } - NeonIORequest::GetPageV(req) => match self.handle_get_pagev_request(req).await { - Ok(()) => NeonIOResult::GetPageV, - Err(errno) => NeonIOResult::Error(errno), + NeonIORequest::GetPageV(req) => { + self.request_get_pagev_counter.inc(); + self.request_get_pagev_nblocks_counter.inc_by(req.nblocks as u64); + match self.handle_get_pagev_request(req).await { + Ok(()) => NeonIOResult::GetPageV, + Err(errno) => NeonIOResult::Error(errno), + } }, NeonIORequest::PrefetchV(req) => { + self.request_prefetchv_counter.inc(); + self.request_prefetchv_nblocks_counter.inc_by(req.nblocks as u64); let req = req.clone(); tokio::spawn(async move { self.handle_prefetchv_request(&req).await }); NeonIOResult::PrefetchVLaunched } NeonIORequest::DbSize(req) => { + self.request_db_size_counter.inc(); let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Db(req.db_oid)); // Check the cache first @@ -242,6 +327,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // Write requests NeonIORequest::WritePage(req) => { + self.request_write_page_counter.inc(); + // Also store it in the LFC while we still have it let rel = req.reltag(); let _in_progress_guard = self.in_progress_table.lock(RequestInProgressKey::Block(rel.clone(), req.block_number)); @@ -251,28 +338,39 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIOResult::WriteOK } NeonIORequest::RelExtend(req) => { + self.request_rel_extend_counter.inc(); + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache .remember_rel_size(&req.reltag(), req.block_number + 1); NeonIOResult::WriteOK } NeonIORequest::RelZeroExtend(req) => { + self.request_rel_zero_extend_counter.inc(); + self.request_rel_zero_extend_nblocks_counter.inc_by(req.nblocks as u64); + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache .remember_rel_size(&req.reltag(), req.block_number + req.nblocks); NeonIOResult::WriteOK } NeonIORequest::RelCreate(req) => { + self.request_rel_create_counter.inc(); + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.remember_rel_size(&req.reltag(), 0); NeonIOResult::WriteOK } NeonIORequest::RelTruncate(req) => { + self.request_rel_truncate_counter.inc(); + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.remember_rel_size(&req.reltag(), req.nblocks); NeonIOResult::WriteOK } NeonIORequest::RelUnlink(req) => { + self.request_rel_unlink_counter.inc(); + // TODO: need to grab an io-in-progress lock for this? I guess not self.cache.forget_rel(&req.reltag()); NeonIOResult::WriteOK @@ -284,6 +382,14 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); // Check the cache first + // + // Note: Because the backends perform a direct lookup in the cache before sending + // the request to the communicator process, we expect the pages to almost never + // be already in cache. It could happen when: + // 1. two backends try to read the same page at the same time, but that should never + // happen because there's higher level locking in the Postgres buffer manager, or + // 2. if a prefetch request finished at the same time as a backend requested the + // page. That's much more likely. let mut cache_misses = Vec::with_capacity(req.nblocks as usize); for i in 0..req.nblocks { let blkno = req.block_number + i as u32; @@ -296,7 +402,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let not_modified_since = match self.cache.get_page(&rel, blkno, dest).await { Ok(CacheResult::Found(_)) => { // get_page already copied the block content to the destination - trace!("found blk {} in rel {:?} in LFC ", blkno, rel); + trace!("found blk {} in rel {:?} in LFC", blkno, rel); continue; } Ok(CacheResult::NotFound(lsn)) => lsn, @@ -304,6 +410,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { }; cache_misses.push((blkno, not_modified_since, dest, in_progress_guard)); } + self.getpage_cache_misses_counter.inc_by(cache_misses.len() as u64); + self.getpage_cache_hits_counter.inc_by(req.nblocks as u64 - cache_misses.len() as u64); + if cache_misses.is_empty() { return Ok(()); } @@ -366,7 +475,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await { Ok(CacheResult::Found(_)) => { - trace!("found blk {} in rel {:?} in LFC ", blkno, rel); + trace!("found blk {} in rel {:?} in LFC", blkno, rel); continue; } Ok(CacheResult::NotFound(lsn)) => lsn, @@ -417,18 +526,32 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { impl<'t> metrics::core::Collector for CommunicatorWorkerProcessStruct<'t> { fn desc(&self) -> Vec<&metrics::core::Desc> { let mut descs = Vec::new(); + + descs.append(&mut self.request_counters.desc()); + descs.append(&mut self.getpage_cache_misses_counter.desc()); + descs.append(&mut self.getpage_cache_hits_counter.desc()); + descs.append(&mut self.request_nblocks_counters.desc()); + if let Some(file_cache) = &self.cache.file_cache { descs.append(&mut file_cache.desc()); } descs.append(&mut self.cache.desc()); + descs } fn collect(&self) -> Vec { let mut values = Vec::new(); + + values.append(&mut self.request_counters.collect()); + values.append(&mut self.getpage_cache_misses_counter.collect()); + values.append(&mut self.getpage_cache_hits_counter.collect()); + values.append(&mut self.request_nblocks_counters.collect()); + if let Some(file_cache) = &self.cache.file_cache { values.append(&mut file_cache.collect()); } values.append(&mut self.cache.collect()); + values } }