More work on metrics

Switch to the 'measured' crate everywhere in the communicator. Connect
the allocator metrics to the metrics endpoint.
This commit is contained in:
Heikki Linnakangas
2025-07-31 14:09:39 +03:00
parent 0428164058
commit 8a4f16a471
6 changed files with 116 additions and 168 deletions

1
Cargo.lock generated
View File

@@ -1315,7 +1315,6 @@ dependencies = [
"http 1.3.1",
"libc",
"measured",
"metrics",
"neon-shmem",
"nix 0.30.1",
"pageserver_api",

View File

@@ -34,7 +34,6 @@ tokio-pipe = { version = "0.2.12" }
tracing.workspace = true
tracing-subscriber.workspace = true
metrics.workspace = true
uring-common = { workspace = true, features = ["bytes"] }
pageserver_client_grpc.workspace = true

View File

@@ -14,6 +14,11 @@ use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use measured::{Gauge, MetricGroup};
use measured::metric::gauge::GaugeState;
use measured::metric::MetricEncoding;
use measured::metric;
use crate::BLCKSZ;
use tokio::task::spawn_blocking;
@@ -22,15 +27,22 @@ pub type CacheBlock = u64;
pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX;
#[derive(Debug)]
pub struct FileCache {
file: Arc<File>,
free_list: Mutex<FreeList>,
// metrics
max_blocks_gauge: metrics::IntGauge,
num_free_blocks_gauge: metrics::IntGauge,
metrics: FileCacheMetricGroup,
}
#[derive(MetricGroup)]
#[metric(new())]
struct FileCacheMetricGroup {
/// Local File Cache size in 8KiB blocks
max_blocks: Gauge,
/// Number of free 8KiB blocks in Local File Cache
num_free_blocks: Gauge,
}
// TODO: We keep track of all free blocks in this vec. That doesn't really scale.
@@ -61,17 +73,6 @@ impl FileCache {
.create(true)
.open(file_cache_path)?;
let max_blocks_gauge = metrics::IntGauge::new(
"file_cache_max_blocks",
"Local File Cache size in 8KiB blocks",
)
.unwrap();
let num_free_blocks_gauge = metrics::IntGauge::new(
"file_cache_num_free_blocks",
"Number of free 8KiB blocks in Local File Cache",
)
.unwrap();
tracing::info!("initialized file cache with {} blocks", initial_size);
Ok(FileCache {
@@ -81,8 +82,7 @@ impl FileCache {
max_blocks: initial_size,
free_blocks: Vec::new(),
}),
max_blocks_gauge,
num_free_blocks_gauge,
metrics: FileCacheMetricGroup::new(),
})
}
@@ -136,27 +136,21 @@ impl FileCache {
}
}
impl metrics::core::Collector for FileCache {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.max_blocks_gauge.desc());
descs.append(&mut self.num_free_blocks_gauge.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
impl <T: metric::group::Encoding> MetricGroup<T> for FileCache
where
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as metric::group::Encoding>::Err> {
// Update the gauges with fresh values first
{
let free_list = self.free_list.lock().unwrap();
self.max_blocks_gauge.set(free_list.max_blocks as i64);
self.metrics.max_blocks.set(free_list.max_blocks as i64);
let total_free_blocks: i64 = free_list.free_blocks.len() as i64
+ (free_list.max_blocks as i64 - free_list.next_free_block as i64);
self.num_free_blocks_gauge.set(total_free_blocks);
self.metrics.num_free_blocks.set(total_free_blocks);
}
let mut values = Vec::new();
values.append(&mut self.max_blocks_gauge.collect());
values.append(&mut self.num_free_blocks_gauge.collect());
values
self.metrics.collect_group_into(enc)
}
}

View File

@@ -18,9 +18,12 @@
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use metrics::IntGauge;
use measured::{Gauge, MetricGroup};
use measured::metric::gauge::GaugeState;
use measured::metric::MetricEncoding;
use measured::metric;
struct MyAllocator {
pub(crate) struct MyAllocator {
allocations: AtomicU64,
deallocations: AtomicU64,
@@ -28,6 +31,22 @@ struct MyAllocator {
high: AtomicUsize,
}
#[derive(MetricGroup)]
#[metric(new())]
struct MyAllocatorMetricGroup {
/// Number of allocations in Rust code
communicator_mem_allocations: Gauge,
/// Number of deallocations in Rust code
communicator_mem_deallocations: Gauge,
/// Bytes currently allocated
communicator_mem_allocated: Gauge,
/// High watermark of allocated bytes
communicator_mem_high: Gauge,
}
unsafe impl GlobalAlloc for MyAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
self.allocations.fetch_add(1, Ordering::Relaxed);
@@ -52,58 +71,32 @@ static GLOBAL: MyAllocator = MyAllocator {
high: AtomicUsize::new(0),
};
pub struct MyAllocatorCollector {
allocations: IntGauge,
deallocations: IntGauge,
allocated: IntGauge,
high: IntGauge,
pub(crate) struct MyAllocatorCollector {
metrics: MyAllocatorMetricGroup,
}
impl MyAllocatorCollector {
pub fn new() -> MyAllocatorCollector {
MyAllocatorCollector {
allocations: IntGauge::new("allocations_total", "Number of allocations in Rust code")
.unwrap(),
deallocations: IntGauge::new(
"deallocations_total",
"Number of deallocations in Rust code",
)
.unwrap(),
allocated: IntGauge::new("allocated_total", "Bytes currently allocated").unwrap(),
high: IntGauge::new("allocated_high", "High watermark of allocated bytes").unwrap(),
pub(crate) fn new() -> Self {
Self {
metrics: MyAllocatorMetricGroup::new(),
}
}
}
impl metrics::core::Collector for MyAllocatorCollector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.allocations.desc());
descs.append(&mut self.deallocations.desc());
descs.append(&mut self.allocated.desc());
descs.append(&mut self.high.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
// update the gauges
self.allocations
impl <T: metric::group::Encoding> MetricGroup<T> for MyAllocatorCollector
where
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as metric::group::Encoding>::Err> {
// Update the gauges with fresh values first
self.metrics.communicator_mem_allocations
.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64);
self.deallocations
self.metrics.communicator_mem_deallocations
.set(GLOBAL.allocations.load(Ordering::Relaxed) as i64);
self.allocated
self.metrics.communicator_mem_allocated
.set(GLOBAL.allocated.load(Ordering::Relaxed) as i64);
self.high.set(GLOBAL.high.load(Ordering::Relaxed) as i64);
self.metrics.communicator_mem_high.set(GLOBAL.high.load(Ordering::Relaxed) as i64);
values.append(&mut self.allocations.collect());
values.append(&mut self.deallocations.collect());
values.append(&mut self.allocated.collect());
values.append(&mut self.high.collect());
values
self.metrics.collect_group_into(enc)
}
}

View File

@@ -32,7 +32,11 @@ use crate::file_cache::{CacheBlock, FileCache};
use crate::init::alloc_from_slice;
use pageserver_page_api::RelTag;
use metrics::{IntCounter, IntGauge};
use measured::{Counter, Gauge, MetricGroup};
use measured::metric::counter::CounterState;
use measured::metric::gauge::GaugeState;
use measured::metric::MetricEncoding;
use measured::metric;
use neon_shmem::hash::{HashMapInit, entry::Entry};
use neon_shmem::shmem::ShmemHandle;
@@ -55,7 +59,6 @@ pub struct IntegratedCacheShared {
}
/// Represents write-access to the integrated cache. This is used by the communicator process.
#[derive(Debug)]
pub struct IntegratedCacheWriteAccess<'t> {
shared: &'t IntegratedCacheShared,
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
@@ -66,17 +69,34 @@ pub struct IntegratedCacheWriteAccess<'t> {
// Fields for eviction
clock_hand: AtomicUsize,
// Metrics
cache_page_evictions_counter: IntCounter,
block_entry_evictions_counter: IntCounter,
clock_iterations_counter: IntCounter,
metrics: IntegratedCacheMetricGroup,
}
#[derive(MetricGroup)]
#[metric(new())]
struct IntegratedCacheMetricGroup {
/// Page evictions from the Local File Cache
cache_page_evictions_counter: Counter,
/// Block entry evictions from the integrated cache
block_entry_evictions_counter: Counter,
/// Number of times the clock hand has moved
clock_iterations_counter: Counter,
// metrics from the hash map
block_map_num_buckets: IntGauge,
block_map_num_buckets_in_use: IntGauge,
relsize_cache_num_buckets: IntGauge,
relsize_cache_num_buckets_in_use: IntGauge,
/// Allocated size of the block cache hash map
block_map_num_buckets: Gauge,
/// Number of buckets in use in the block cache hash map
block_map_num_buckets_in_use: Gauge,
/// Allocated size of the relsize cache hash map
relsize_cache_num_buckets: Gauge,
/// Number of buckets in use in the relsize cache hash map
relsize_cache_num_buckets_in_use: Gauge,
}
/// Represents read-only access to the integrated cache. Backend processes have this.
@@ -151,46 +171,7 @@ impl<'t> IntegratedCacheInitStruct<'t> {
block_map: block_map_handle.attach_writer(),
file_cache,
clock_hand: AtomicUsize::new(0),
cache_page_evictions_counter: metrics::IntCounter::new(
"integrated_cache_page_evictions",
"Page evictions from the Local File Cache",
)
.unwrap(),
block_entry_evictions_counter: metrics::IntCounter::new(
"integrated_cache_block_entry_evictions",
"Block entry evictions from the integrated cache",
)
.unwrap(),
clock_iterations_counter: metrics::IntCounter::new(
"clock_iterations",
"Number of times the clock hand has moved",
)
.unwrap(),
block_map_num_buckets: metrics::IntGauge::new(
"block_map_num_buckets",
"Allocated size of the block cache hash map",
)
.unwrap(),
block_map_num_buckets_in_use: metrics::IntGauge::new(
"block_map_num_buckets_in_use",
"Number of buckets in use in the block cache hash map",
)
.unwrap(),
relsize_cache_num_buckets: metrics::IntGauge::new(
"relsize_cache_num_buckets",
"Allocated size of the relsize cache hash map",
)
.unwrap(),
relsize_cache_num_buckets_in_use: metrics::IntGauge::new(
"relsize_cache_num_buckets_in_use",
"Number of buckets in use in the relsize cache hash map",
)
.unwrap(),
metrics: IntegratedCacheMetricGroup::new(),
}
}
@@ -642,7 +623,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
fn try_evict_block_entry(&self) {
let num_buckets = self.block_map.get_num_buckets();
loop {
self.clock_iterations_counter.inc();
self.metrics.clock_iterations_counter.inc();
let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets;
let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() {
@@ -693,7 +674,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
let num_buckets = self.block_map.get_num_buckets();
let mut iterations = 0;
while iterations < 100 {
self.clock_iterations_counter.inc();
self.metrics.clock_iterations_counter.inc();
let victim_bucket = self.clock_hand.fetch_add(1, Ordering::Relaxed) % num_buckets;
let evict_this = match self.block_map.get_at_bucket(victim_bucket).as_deref() {
@@ -759,9 +740,9 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
n => Some(n),
};
if evicted_cache_block.is_some() {
self.cache_page_evictions_counter.inc();
self.metrics.cache_page_evictions_counter.inc();
}
self.block_entry_evictions_counter.inc();
self.metrics.block_entry_evictions_counter.inc();
EvictResult::Evicted(evicted_cache_block)
} else {
EvictResult::Pinned
@@ -794,44 +775,27 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
}
}
impl metrics::core::Collector for IntegratedCacheWriteAccess<'_> {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.cache_page_evictions_counter.desc());
descs.append(&mut self.block_entry_evictions_counter.desc());
descs.append(&mut self.clock_iterations_counter.desc());
descs.append(&mut self.block_map_num_buckets.desc());
descs.append(&mut self.block_map_num_buckets_in_use.desc());
descs.append(&mut self.relsize_cache_num_buckets.desc());
descs.append(&mut self.relsize_cache_num_buckets_in_use.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
impl <T: metric::group::Encoding> MetricGroup<T> for IntegratedCacheWriteAccess<'_>
where
CounterState: MetricEncoding<T>,
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as metric::group::Encoding>::Err> {
// Update gauges
self.block_map_num_buckets
self.metrics.block_map_num_buckets
.set(self.block_map.get_num_buckets() as i64);
self.block_map_num_buckets_in_use
self.metrics.block_map_num_buckets_in_use
.set(self.block_map.get_num_buckets_in_use() as i64);
self.relsize_cache_num_buckets
self.metrics.relsize_cache_num_buckets
.set(self.relsize_cache.get_num_buckets() as i64);
self.relsize_cache_num_buckets_in_use
self.metrics.relsize_cache_num_buckets_in_use
.set(self.relsize_cache.get_num_buckets_in_use() as i64);
let mut values = Vec::new();
values.append(&mut self.cache_page_evictions_counter.collect());
values.append(&mut self.block_entry_evictions_counter.collect());
values.append(&mut self.clock_iterations_counter.collect());
if let Some(file_cache) = &self.file_cache {
file_cache.collect_group_into(enc)?;
}
values.append(&mut self.block_map_num_buckets.collect());
values.append(&mut self.block_map_num_buckets_in_use.collect());
values.append(&mut self.relsize_cache_num_buckets.collect());
values.append(&mut self.relsize_cache_num_buckets_in_use.collect());
values
self.metrics.collect_group_into(enc)
}
}

View File

@@ -23,6 +23,7 @@ use uring_common::buf::IoBuf;
use measured::MetricGroup;
use measured::metric::MetricEncoding;
use measured::metric::counter::CounterState;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::{Gauge, GaugeVec};
@@ -30,7 +31,7 @@ use utils::id::{TenantId, TimelineId};
use super::callbacks::{get_request_lsn, notify_proc};
use tracing::{debug, error, info, info_span, trace};
use tracing::{error, info, info_span, trace};
use utils::lsn::Lsn;
@@ -65,7 +66,6 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
// For the requests that affect multiple blocks, have separate counters for the # of blocks affected
request_nblocks_counters: GaugeVec<RequestTypeLabelGroupSet>,
#[allow(dead_code)]
allocator_metrics: MyAllocatorCollector,
}
@@ -145,8 +145,6 @@ pub(super) fn init(
.integrated_cache_init_struct
.worker_process_init(last_lsn, file_cache);
debug!("Initialised integrated cache: {cache:?}");
let client = {
let _guard = runtime.enter();
PageserverClient::new(
@@ -818,6 +816,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct<'_>
where
T: Encoding,
CounterState: MetricEncoding<T>,
GaugeState: MetricEncoding<T>,
{
fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
@@ -825,12 +824,12 @@ where
use measured::metric::name::MetricName;
self.lfc_metrics.collect_group_into(enc)?;
self.cache.collect_group_into(enc)?;
self.request_counters
.collect_family_into(MetricName::from_str("request_counters"), enc)?;
self.request_nblocks_counters
.collect_family_into(MetricName::from_str("request_nblocks_counters"), enc)?;
// FIXME: allocator metrics
self.allocator_metrics.collect_group_into(enc)?;
Ok(())
}