mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-25 06:10:37 +00:00
Compare commits
5 Commits
release-pr
...
conrad/mem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfb2e3c178 | ||
|
|
0a34084ba5 | ||
|
|
b33047df7e | ||
|
|
1c5477619f | ||
|
|
40f5b3e8df |
@@ -30,6 +30,7 @@ workspace-members = [
|
||||
"vm_monitor",
|
||||
# All of these exist in libs and are not usually built independently.
|
||||
# Putting workspace hack there adds a bottleneck for cargo builds.
|
||||
"alloc-metrics",
|
||||
"compute_api",
|
||||
"consumption_metrics",
|
||||
"desim",
|
||||
|
||||
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -61,6 +61,17 @@ dependencies = [
|
||||
"equator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "alloc-metrics"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"measured",
|
||||
"metrics",
|
||||
"thread_local",
|
||||
"tikv-jemallocator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "allocator-api2"
|
||||
version = "0.2.16"
|
||||
@@ -5301,6 +5312,7 @@ name = "proxy"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"alloc-metrics",
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"assert-json-diff",
|
||||
@@ -7332,12 +7344,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
|
||||
version = "1.1.9"
|
||||
source = "git+https://github.com/conradludgate/thread_local-rs?branch=no-tls-destructor-get#f9ca3d375745c14a632ae3ffe6a7a646dc8421a0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -195,6 +195,7 @@ sync_wrapper = "0.1.2"
|
||||
tar = "0.4"
|
||||
test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
thread_local = "1.1.9"
|
||||
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tokio = { version = "1.43.1", features = ["macros"] }
|
||||
@@ -253,6 +254,7 @@ azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git"
|
||||
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
|
||||
## Local libraries
|
||||
alloc-metrics = { version = "0.1", path = "./libs/alloc-metrics/" }
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
|
||||
desim = { version = "0.1", path = "./libs/desim" }
|
||||
@@ -302,6 +304,9 @@ tonic-build = "0.13.1"
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
|
||||
|
||||
# Needed to fix a bug in alloc-metrics
|
||||
thread_local = { git = "https://github.com/conradludgate/thread_local-rs", branch = "no-tls-destructor-get" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
[profile.release]
|
||||
|
||||
18
libs/alloc-metrics/Cargo.toml
Normal file
18
libs/alloc-metrics/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "alloc-metrics"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
metrics.workspace = true
|
||||
measured.workspace = true
|
||||
thread_local.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
|
||||
[[bench]]
|
||||
harness = false
|
||||
name = "alloc"
|
||||
110
libs/alloc-metrics/benches/alloc.rs
Normal file
110
libs/alloc-metrics/benches/alloc.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
use std::alloc::{GlobalAlloc, Layout, System, handle_alloc_error};
|
||||
|
||||
use alloc_metrics::TrackedAllocator;
|
||||
use criterion::{
|
||||
AxisScale, BenchmarkGroup, BenchmarkId, Criterion, PlotConfiguration, measurement::Measurement,
|
||||
};
|
||||
use measured::FixedCardinalityLabel;
|
||||
use tikv_jemallocator::Jemalloc;
|
||||
|
||||
fn main() {
|
||||
let mut c = Criterion::default().configure_from_args();
|
||||
bench(&mut c);
|
||||
c.final_summary();
|
||||
}
|
||||
|
||||
#[rustfmt::skip]
|
||||
fn bench(c: &mut Criterion) {
|
||||
bench_alloc(c.benchmark_group("alloc/system"), &System, &ALLOC_SYSTEM);
|
||||
bench_alloc(c.benchmark_group("alloc/jemalloc"), &Jemalloc, &ALLOC_JEMALLOC);
|
||||
|
||||
bench_dealloc(c.benchmark_group("dealloc/system"), &System, &ALLOC_SYSTEM);
|
||||
bench_dealloc(c.benchmark_group("dealloc/jemalloc"), &Jemalloc, &ALLOC_JEMALLOC);
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
#[label(singleton = "memory_context")]
|
||||
pub enum MemoryContext {
|
||||
Root,
|
||||
Test,
|
||||
}
|
||||
|
||||
static ALLOC_SYSTEM: TrackedAllocator<System, MemoryContext> =
|
||||
unsafe { TrackedAllocator::new(System, MemoryContext::Root) };
|
||||
static ALLOC_JEMALLOC: TrackedAllocator<Jemalloc, MemoryContext> =
|
||||
unsafe { TrackedAllocator::new(Jemalloc, MemoryContext::Root) };
|
||||
|
||||
const KB: u64 = 1024;
|
||||
const SIZES: [u64; 6] = [64, 256, KB, 4 * KB, 16 * KB, KB * KB];
|
||||
|
||||
fn bench_alloc<A: GlobalAlloc>(
|
||||
mut g: BenchmarkGroup<'_, impl Measurement>,
|
||||
alloc1: &'static A,
|
||||
alloc2: &'static TrackedAllocator<A, MemoryContext>,
|
||||
) {
|
||||
g.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic));
|
||||
for size in SIZES {
|
||||
let layout = Layout::from_size_align(size as usize, 8).unwrap();
|
||||
|
||||
g.throughput(criterion::Throughput::Bytes(size));
|
||||
g.bench_with_input(BenchmarkId::new("default", size), &layout, |b, &layout| {
|
||||
let bs = criterion::BatchSize::NumBatches(10 + size.ilog2() as u64);
|
||||
b.iter_batched(|| {}, |()| Alloc::new(alloc1, layout), bs);
|
||||
});
|
||||
g.bench_with_input(BenchmarkId::new("tracked", size), &layout, |b, &layout| {
|
||||
let _scope = alloc2.scope(MemoryContext::Test);
|
||||
|
||||
let bs = criterion::BatchSize::NumBatches(10 + size.ilog2() as u64);
|
||||
b.iter_batched(|| {}, |()| Alloc::new(alloc2, layout), bs);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn bench_dealloc<A: GlobalAlloc>(
|
||||
mut g: BenchmarkGroup<'_, impl Measurement>,
|
||||
alloc1: &'static A,
|
||||
alloc2: &'static TrackedAllocator<A, MemoryContext>,
|
||||
) {
|
||||
g.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic));
|
||||
for size in SIZES {
|
||||
let layout = Layout::from_size_align(size as usize, 8).unwrap();
|
||||
|
||||
g.throughput(criterion::Throughput::Bytes(size));
|
||||
g.bench_with_input(BenchmarkId::new("default", size), &layout, |b, &layout| {
|
||||
let bs = criterion::BatchSize::NumBatches(10 + size.ilog2() as u64);
|
||||
b.iter_batched(|| Alloc::new(alloc1, layout), drop, bs);
|
||||
});
|
||||
g.bench_with_input(BenchmarkId::new("tracked", size), &layout, |b, &layout| {
|
||||
let _scope = alloc2.scope(MemoryContext::Test);
|
||||
|
||||
let bs = criterion::BatchSize::NumBatches(10 + size.ilog2() as u64);
|
||||
b.iter_batched(|| Alloc::new(alloc2, layout), drop, bs);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
struct Alloc<'a, A: GlobalAlloc> {
|
||||
alloc: &'a A,
|
||||
ptr: *mut u8,
|
||||
layout: Layout,
|
||||
}
|
||||
|
||||
impl<'a, A: GlobalAlloc> Alloc<'a, A> {
|
||||
fn new(alloc: &'a A, layout: Layout) -> Self {
|
||||
let ptr = unsafe { alloc.alloc(layout) };
|
||||
if ptr.is_null() {
|
||||
handle_alloc_error(layout);
|
||||
}
|
||||
|
||||
// actually make the page resident.
|
||||
unsafe { ptr.cast::<u8>().write(1) };
|
||||
|
||||
Self { alloc, ptr, layout }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, A: GlobalAlloc> Drop for Alloc<'a, A> {
|
||||
fn drop(&mut self) {
|
||||
unsafe { self.alloc.dealloc(self.ptr, self.layout) };
|
||||
}
|
||||
}
|
||||
48
libs/alloc-metrics/src/counters.rs
Normal file
48
libs/alloc-metrics/src/counters.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use measured::{
|
||||
FixedCardinalityLabel, LabelGroup, label::StaticLabelSet, metric::MetricFamilyEncoding,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, Dec, Inc, MeasuredCounterPairState};
|
||||
|
||||
use crate::metric_vec::DenseMetricVec;
|
||||
|
||||
pub struct DenseCounterPairVec<
|
||||
A: CounterPairAssoc<LabelGroupSet = StaticLabelSet<L>>,
|
||||
L: FixedCardinalityLabel + LabelGroup,
|
||||
> {
|
||||
pub vec: DenseMetricVec<MeasuredCounterPairState, L>,
|
||||
pub _marker: PhantomData<A>,
|
||||
}
|
||||
|
||||
impl<A: CounterPairAssoc<LabelGroupSet = StaticLabelSet<L>>, L: FixedCardinalityLabel + LabelGroup>
|
||||
DenseCounterPairVec<A, L>
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
vec: DenseMetricVec::new(),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, A, L> ::measured::metric::group::MetricGroup<T> for DenseCounterPairVec<A, L>
|
||||
where
|
||||
T: ::measured::metric::group::Encoding,
|
||||
::measured::metric::counter::CounterState: ::measured::metric::MetricEncoding<T>,
|
||||
A: CounterPairAssoc<LabelGroupSet = StaticLabelSet<L>>,
|
||||
L: FixedCardinalityLabel + LabelGroup,
|
||||
{
|
||||
fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
|
||||
// write decrement first to avoid a race condition where inc - dec < 0
|
||||
T::write_help(enc, A::DEC_NAME, A::DEC_HELP)?;
|
||||
self.vec
|
||||
.collect_family_into(A::DEC_NAME, &mut Dec(&mut *enc))?;
|
||||
|
||||
T::write_help(enc, A::INC_NAME, A::INC_HELP)?;
|
||||
self.vec
|
||||
.collect_family_into(A::INC_NAME, &mut Inc(&mut *enc))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
441
libs/alloc-metrics/src/lib.rs
Normal file
441
libs/alloc-metrics/src/lib.rs
Normal file
@@ -0,0 +1,441 @@
|
||||
//! Tagged allocator measurements.
|
||||
|
||||
mod counters;
|
||||
mod metric_vec;
|
||||
|
||||
use std::{
|
||||
alloc::{GlobalAlloc, Layout},
|
||||
cell::Cell,
|
||||
marker::PhantomData,
|
||||
sync::{
|
||||
OnceLock,
|
||||
atomic::{AtomicU64, Ordering::Relaxed},
|
||||
},
|
||||
};
|
||||
|
||||
use measured::{
|
||||
FixedCardinalityLabel, LabelGroup, MetricGroup,
|
||||
label::StaticLabelSet,
|
||||
metric::{MetricEncoding, counter::CounterState, group::Encoding, name::MetricName},
|
||||
};
|
||||
use metrics::{CounterPairAssoc, MeasuredCounterPairState};
|
||||
use thread_local::ThreadLocal;
|
||||
|
||||
type AllocCounter<T> = counters::DenseCounterPairVec<AllocPair<T>, T>;
|
||||
|
||||
pub struct TrackedAllocator<A, T: 'static + Send + Sync + FixedCardinalityLabel + LabelGroup> {
|
||||
inner: A,
|
||||
|
||||
/// potentially high-content fallback if the thread was not registered.
|
||||
default_counters: MeasuredCounterPairState,
|
||||
/// Default tag to use if this thread is not registered.
|
||||
default_tag: T,
|
||||
|
||||
thread: OnceLock<RegisteredThread<T>>,
|
||||
|
||||
/// where thread alloc data is eventually saved to, even if threads are shutdown.
|
||||
global: OnceLock<AllocCounter<T>>,
|
||||
}
|
||||
|
||||
impl<A, T> TrackedAllocator<A, T>
|
||||
where
|
||||
T: 'static + Send + Sync + FixedCardinalityLabel + LabelGroup,
|
||||
{
|
||||
/// # Safety
|
||||
///
|
||||
/// [`FixedCardinalityLabel`] must be implemented correctly, fully dense, and must not panic.
|
||||
pub const unsafe fn new(alloc: A, default: T) -> Self {
|
||||
TrackedAllocator {
|
||||
inner: alloc,
|
||||
default_tag: default,
|
||||
default_counters: MeasuredCounterPairState {
|
||||
inc: CounterState {
|
||||
count: AtomicU64::new(0),
|
||||
},
|
||||
dec: CounterState {
|
||||
count: AtomicU64::new(0),
|
||||
},
|
||||
},
|
||||
thread: OnceLock::new(),
|
||||
global: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Allocations
|
||||
pub fn register_thread(&'static self) {
|
||||
self.register_thread_inner();
|
||||
}
|
||||
|
||||
pub fn scope(&'static self, tag: T) -> AllocScope<'static, T> {
|
||||
let cell = self.register_thread_inner();
|
||||
let last = cell.replace(tag);
|
||||
AllocScope { cell, last }
|
||||
}
|
||||
|
||||
fn register_thread_inner(&'static self) -> &'static Cell<T> {
|
||||
let thread = self.thread.get_or_init(|| RegisteredThread {
|
||||
scope: ThreadLocal::new(),
|
||||
state: ThreadLocal::new(),
|
||||
});
|
||||
|
||||
thread.state.get_or(|| ThreadState {
|
||||
counters: AllocCounter::new(),
|
||||
global: self.global.get_or_init(AllocCounter::new),
|
||||
});
|
||||
|
||||
thread.scope.get_or(|| Cell::new(self.default_tag))
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! alloc {
|
||||
($alloc_fn:ident) => {
|
||||
unsafe fn $alloc_fn(&self, layout: Layout) -> *mut u8 {
|
||||
let Ok((tagged_layout, tag_offset)) = layout.extend(Layout::new::<T>()) else {
|
||||
return std::ptr::null_mut();
|
||||
};
|
||||
let tagged_layout = tagged_layout.pad_to_align();
|
||||
|
||||
// Safety: The layout is not zero-sized.
|
||||
let ptr = unsafe { self.inner.$alloc_fn(tagged_layout) };
|
||||
|
||||
// allocation failed.
|
||||
if ptr.is_null() {
|
||||
return ptr;
|
||||
}
|
||||
|
||||
// We are being very careful here to not allocate or panic.
|
||||
let thread = self.thread.get().map(|s| (s.scope.get(), s.state.get()));
|
||||
let tag = thread.and_then(|t| t.0).map_or(self.default_tag, Cell::get);
|
||||
|
||||
// Allocation successful. Write our tag
|
||||
// Safety: tag_offset is inbounds of the ptr
|
||||
unsafe { ptr.add(tag_offset).cast::<T>().write(tag) }
|
||||
|
||||
let counters = thread.and_then(|t| t.1).map(|s| &s.counters);
|
||||
let metric = if let Some(counters) = counters {
|
||||
counters.vec.get_metric(tag)
|
||||
} else {
|
||||
// if tag is not default, then the thread state would have been registered, therefore tag must be default.
|
||||
&self.default_counters
|
||||
};
|
||||
|
||||
metric.inc.count.fetch_add(layout.size() as u64, Relaxed);
|
||||
|
||||
ptr
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// We will tag our allocation by adding `T` to the end of the layout.
|
||||
// This is ok only as long as it does not overflow. If it does, we will
|
||||
// just fail the allocation by returning null.
|
||||
//
|
||||
// Safety: we will not unwind during alloc, and we will ensure layouts are handled correctly.
|
||||
unsafe impl<A, T> GlobalAlloc for TrackedAllocator<A, T>
|
||||
where
|
||||
A: GlobalAlloc,
|
||||
T: 'static + Send + Sync + FixedCardinalityLabel + LabelGroup,
|
||||
{
|
||||
alloc!(alloc);
|
||||
alloc!(alloc_zeroed);
|
||||
|
||||
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
|
||||
// SAFETY: the caller must ensure that the `new_size` does not overflow.
|
||||
// `layout.align()` comes from a `Layout` and is thus guaranteed to be valid.
|
||||
let new_layout = unsafe { Layout::from_size_align_unchecked(new_size, layout.align()) };
|
||||
|
||||
let Ok((new_tagged_layout, new_tag_offset)) = new_layout.extend(Layout::new::<T>()) else {
|
||||
return std::ptr::null_mut();
|
||||
};
|
||||
let new_tagged_layout = new_tagged_layout.pad_to_align();
|
||||
|
||||
let Ok((tagged_layout, tag_offset)) = layout.extend(Layout::new::<T>()) else {
|
||||
// Safety: This layout clearly did not match what was originally allocated,
|
||||
// otherwise alloc() would have caught this error and returned null.
|
||||
unsafe { std::hint::unreachable_unchecked() }
|
||||
};
|
||||
let tagged_layout = tagged_layout.pad_to_align();
|
||||
|
||||
// get the tag set during alloc
|
||||
// Safety: tag_offset is inbounds of the ptr
|
||||
let tag = unsafe { ptr.add(tag_offset).cast::<T>().read() };
|
||||
|
||||
// Safety: layout sizes are correct
|
||||
let new_ptr = unsafe {
|
||||
self.inner
|
||||
.realloc(ptr, tagged_layout, new_tagged_layout.size())
|
||||
};
|
||||
|
||||
// allocation failed.
|
||||
if new_ptr.is_null() {
|
||||
return new_ptr;
|
||||
}
|
||||
|
||||
// We are being very careful here to not allocate or panic.
|
||||
let thread = self.thread.get().map(|s| (s.scope.get(), s.state.get()));
|
||||
let new_tag = thread.and_then(|t| t.0).map_or(self.default_tag, Cell::get);
|
||||
|
||||
// Allocation successful. Write our tag
|
||||
// Safety: new_tag_offset is inbounds of the ptr
|
||||
unsafe { new_ptr.add(new_tag_offset).cast::<T>().write(new_tag) }
|
||||
|
||||
let counters = thread.and_then(|t| t.1).map(|s| &s.counters);
|
||||
let counters = counters.or_else(|| self.global.get());
|
||||
let (new_metric, old_metric) = if let Some(counters) = counters {
|
||||
let new_metric = counters.vec.get_metric(new_tag);
|
||||
let old_metric = counters.vec.get_metric(tag);
|
||||
|
||||
(new_metric, old_metric)
|
||||
} else {
|
||||
// no tag was registered at all, therefore both tags must be default.
|
||||
(&self.default_counters, &self.default_counters)
|
||||
};
|
||||
|
||||
let (inc, dec) = if tag.encode() != new_tag.encode() {
|
||||
(new_layout.size() as u64, layout.size() as u64)
|
||||
} else if new_layout.size() > layout.size() {
|
||||
((new_layout.size() - layout.size()) as u64, 0)
|
||||
} else {
|
||||
(0, (layout.size() - new_layout.size()) as u64)
|
||||
};
|
||||
|
||||
new_metric.inc.count.fetch_add(inc, Relaxed);
|
||||
old_metric.dec.count.fetch_add(dec, Relaxed);
|
||||
|
||||
new_ptr
|
||||
}
|
||||
|
||||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
|
||||
let Ok((tagged_layout, tag_offset)) = layout.extend(Layout::new::<T>()) else {
|
||||
// Safety: This layout clearly did not match what was originally allocated,
|
||||
// otherwise alloc() would have caught this error and returned null.
|
||||
unsafe { std::hint::unreachable_unchecked() }
|
||||
};
|
||||
let tagged_layout = tagged_layout.pad_to_align();
|
||||
|
||||
// get the tag set during alloc
|
||||
// Safety: tag_offset is inbounds of the ptr
|
||||
let tag = unsafe { ptr.add(tag_offset).cast::<T>().read() };
|
||||
|
||||
// Safety: caller upholds contract for us
|
||||
unsafe { self.inner.dealloc(ptr, tagged_layout) }
|
||||
|
||||
// We are being very careful here to not allocate or panic.
|
||||
let thread = self.thread.get().map(|s| (s.scope.get(), s.state.get()));
|
||||
let counters = thread.and_then(|t| t.1).map(|s| &s.counters);
|
||||
let counters = counters.or_else(|| self.global.get());
|
||||
|
||||
let metric = if let Some(counters) = counters {
|
||||
counters.vec.get_metric(tag)
|
||||
} else {
|
||||
// if tag is not default, then global would have been registered, therefore tag must be default.
|
||||
&self.default_counters
|
||||
};
|
||||
|
||||
metric.dec.count.fetch_add(layout.size() as u64, Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AllocScope<'a, T: FixedCardinalityLabel> {
|
||||
cell: &'a Cell<T>,
|
||||
last: T,
|
||||
}
|
||||
|
||||
impl<'a, T: FixedCardinalityLabel> Drop for AllocScope<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
self.cell.set(self.last);
|
||||
}
|
||||
}
|
||||
|
||||
struct AllocPair<T>(PhantomData<T>);
|
||||
|
||||
impl<T: FixedCardinalityLabel + LabelGroup> CounterPairAssoc for AllocPair<T> {
|
||||
const INC_NAME: &'static MetricName = MetricName::from_str("allocated_bytes");
|
||||
const DEC_NAME: &'static MetricName = MetricName::from_str("deallocated_bytes");
|
||||
|
||||
const INC_HELP: &'static str = "total number of bytes allocated";
|
||||
const DEC_HELP: &'static str = "total number of bytes deallocated";
|
||||
|
||||
type LabelGroupSet = StaticLabelSet<T>;
|
||||
}
|
||||
|
||||
struct RegisteredThread<T: 'static + Send + Sync + FixedCardinalityLabel + LabelGroup> {
|
||||
/// Current memory context for this thread.
|
||||
scope: ThreadLocal<Cell<T>>,
|
||||
/// per thread state containing low contention counters for faster allocations.
|
||||
state: ThreadLocal<ThreadState<T>>,
|
||||
}
|
||||
|
||||
struct ThreadState<T: 'static + FixedCardinalityLabel + LabelGroup> {
|
||||
counters: AllocCounter<T>,
|
||||
global: &'static AllocCounter<T>,
|
||||
}
|
||||
|
||||
// Ensure the counters are measured on thread destruction.
|
||||
impl<T: 'static + FixedCardinalityLabel + LabelGroup> Drop for ThreadState<T> {
|
||||
fn drop(&mut self) {
|
||||
// iterate over all labels
|
||||
for tag in (0..T::cardinality()).map(T::decode) {
|
||||
// load and reset the counts in the thread-local counters.
|
||||
let m = self.counters.vec.get_metric_mut(tag);
|
||||
let inc = *m.inc.count.get_mut();
|
||||
let dec = *m.dec.count.get_mut();
|
||||
|
||||
// add the counts into the global counters.
|
||||
let m = self.global.vec.get_metric(tag);
|
||||
m.inc.count.fetch_add(inc, Relaxed);
|
||||
m.dec.count.fetch_add(dec, Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, T, Enc> MetricGroup<Enc> for TrackedAllocator<A, T>
|
||||
where
|
||||
T: 'static + Send + Sync + FixedCardinalityLabel + LabelGroup,
|
||||
Enc: Encoding,
|
||||
CounterState: MetricEncoding<Enc>,
|
||||
{
|
||||
fn collect_group_into(&self, enc: &mut Enc) -> Result<(), Enc::Err> {
|
||||
let global = self.global.get_or_init(AllocCounter::new);
|
||||
|
||||
// iterate over all counter threads
|
||||
for s in self.thread.get().into_iter().flat_map(|s| s.state.iter()) {
|
||||
// iterate over all labels
|
||||
for tag in (0..T::cardinality()).map(T::decode) {
|
||||
sample(global, s.counters.vec.get_metric(tag), tag);
|
||||
}
|
||||
}
|
||||
|
||||
sample(global, &self.default_counters, self.default_tag);
|
||||
|
||||
global.collect_group_into(enc)
|
||||
}
|
||||
}
|
||||
|
||||
fn sample<T: FixedCardinalityLabel + LabelGroup>(
|
||||
global: &AllocCounter<T>,
|
||||
local: &MeasuredCounterPairState,
|
||||
tag: T,
|
||||
) {
|
||||
// load and reset the counts in the thread-local counters.
|
||||
let inc = local.inc.count.swap(0, Relaxed);
|
||||
let dec = local.dec.count.swap(0, Relaxed);
|
||||
|
||||
// add the counts into the global counters.
|
||||
let m = global.vec.get_metric(tag);
|
||||
m.inc.count.fetch_add(inc, Relaxed);
|
||||
m.dec.count.fetch_add(dec, Relaxed);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::alloc::{GlobalAlloc, Layout, System};
|
||||
|
||||
use measured::{FixedCardinalityLabel, MetricGroup, text::BufferedTextEncoder};
|
||||
|
||||
use crate::TrackedAllocator;
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
#[label(singleton = "memory_context")]
|
||||
pub enum MemoryContext {
|
||||
Root,
|
||||
Test,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn alloc() {
|
||||
// Safety: `MemoryContext` upholds the safety requirements.
|
||||
static GLOBAL: TrackedAllocator<System, MemoryContext> =
|
||||
unsafe { TrackedAllocator::new(System, MemoryContext::Root) };
|
||||
|
||||
GLOBAL.register_thread();
|
||||
|
||||
let _test = GLOBAL.scope(MemoryContext::Test);
|
||||
|
||||
let ptr = unsafe { GLOBAL.alloc(Layout::for_value(&[0_i32])) };
|
||||
let ptr = unsafe { GLOBAL.realloc(ptr, Layout::for_value(&[0_i32]), 8) };
|
||||
|
||||
drop(_test);
|
||||
|
||||
let ptr = unsafe { GLOBAL.realloc(ptr, Layout::for_value(&[0_i32, 1_i32]), 4) };
|
||||
unsafe { GLOBAL.dealloc(ptr, Layout::for_value(&[0_i32])) };
|
||||
|
||||
let mut text = BufferedTextEncoder::new();
|
||||
GLOBAL.collect_group_into(&mut text).unwrap();
|
||||
let text = String::from_utf8(text.finish().into()).unwrap();
|
||||
assert_eq!(
|
||||
text,
|
||||
r#"# HELP deallocated_bytes total number of bytes deallocated
|
||||
# TYPE deallocated_bytes counter
|
||||
deallocated_bytes{memory_context="root"} 4
|
||||
deallocated_bytes{memory_context="test"} 8
|
||||
|
||||
# HELP allocated_bytes total number of bytes allocated
|
||||
# TYPE allocated_bytes counter
|
||||
allocated_bytes{memory_context="root"} 4
|
||||
allocated_bytes{memory_context="test"} 8
|
||||
"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unregistered_thread() {
|
||||
// Safety: `MemoryContext` upholds the safety requirements.
|
||||
static GLOBAL: TrackedAllocator<System, MemoryContext> =
|
||||
unsafe { TrackedAllocator::new(System, MemoryContext::Root) };
|
||||
|
||||
GLOBAL.register_thread();
|
||||
|
||||
// unregistered thread
|
||||
std::thread::spawn(|| {
|
||||
let ptr = unsafe { GLOBAL.alloc(Layout::for_value(&[0_i32])) };
|
||||
unsafe { GLOBAL.dealloc(ptr, Layout::for_value(&[0_i32])) };
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
|
||||
let mut text = BufferedTextEncoder::new();
|
||||
GLOBAL.collect_group_into(&mut text).unwrap();
|
||||
let text = String::from_utf8(text.finish().into()).unwrap();
|
||||
assert_eq!(
|
||||
text,
|
||||
r#"# HELP deallocated_bytes total number of bytes deallocated
|
||||
# TYPE deallocated_bytes counter
|
||||
deallocated_bytes{memory_context="root"} 4
|
||||
deallocated_bytes{memory_context="test"} 0
|
||||
|
||||
# HELP allocated_bytes total number of bytes allocated
|
||||
# TYPE allocated_bytes counter
|
||||
allocated_bytes{memory_context="root"} 4
|
||||
allocated_bytes{memory_context="test"} 0
|
||||
"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fully_unregistered() {
|
||||
// Safety: `MemoryContext` upholds the safety requirements.
|
||||
static GLOBAL: TrackedAllocator<System, MemoryContext> =
|
||||
unsafe { TrackedAllocator::new(System, MemoryContext::Root) };
|
||||
|
||||
let ptr = unsafe { GLOBAL.alloc(Layout::for_value(&[0_i32])) };
|
||||
unsafe { GLOBAL.dealloc(ptr, Layout::for_value(&[0_i32])) };
|
||||
|
||||
let mut text = BufferedTextEncoder::new();
|
||||
GLOBAL.collect_group_into(&mut text).unwrap();
|
||||
let text = String::from_utf8(text.finish().into()).unwrap();
|
||||
assert_eq!(
|
||||
text,
|
||||
r#"# HELP deallocated_bytes total number of bytes deallocated
|
||||
# TYPE deallocated_bytes counter
|
||||
deallocated_bytes{memory_context="root"} 4
|
||||
deallocated_bytes{memory_context="test"} 0
|
||||
|
||||
# HELP allocated_bytes total number of bytes allocated
|
||||
# TYPE allocated_bytes counter
|
||||
allocated_bytes{memory_context="root"} 4
|
||||
allocated_bytes{memory_context="test"} 0
|
||||
"#
|
||||
);
|
||||
}
|
||||
}
|
||||
72
libs/alloc-metrics/src/metric_vec.rs
Normal file
72
libs/alloc-metrics/src/metric_vec.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
//! Dense metric vec
|
||||
|
||||
use measured::{
|
||||
FixedCardinalityLabel, LabelGroup,
|
||||
label::StaticLabelSet,
|
||||
metric::{
|
||||
MetricEncoding, MetricFamilyEncoding, MetricType, group::Encoding, name::MetricNameEncoder,
|
||||
},
|
||||
};
|
||||
|
||||
pub struct DenseMetricVec<M: MetricType, L: FixedCardinalityLabel + LabelGroup> {
|
||||
metrics: Box<[M]>,
|
||||
metadata: M::Metadata,
|
||||
_label_set: StaticLabelSet<L>,
|
||||
}
|
||||
|
||||
fn new_dense<M: MetricType>(c: usize) -> Box<[M]> {
|
||||
let mut vec = Vec::with_capacity(c);
|
||||
vec.resize_with(c, M::default);
|
||||
vec.into_boxed_slice()
|
||||
}
|
||||
|
||||
impl<M: MetricType, L: FixedCardinalityLabel + LabelGroup> DenseMetricVec<M, L>
|
||||
where
|
||||
M::Metadata: Default,
|
||||
{
|
||||
/// Create a new metric vec with the given label set and metric metadata
|
||||
pub fn new() -> Self {
|
||||
Self::with_metadata(<M::Metadata>::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MetricType, L: FixedCardinalityLabel + LabelGroup> DenseMetricVec<M, L> {
|
||||
/// Create a new metric vec with the given label set and metric metadata
|
||||
pub fn with_metadata(metadata: M::Metadata) -> Self {
|
||||
Self {
|
||||
metrics: new_dense(L::cardinality()),
|
||||
metadata,
|
||||
_label_set: StaticLabelSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the individual metric at the given identifier.
|
||||
///
|
||||
/// # Panics
|
||||
/// Can panic or cause strange behaviour if the label ID comes from a different metric family.
|
||||
pub fn get_metric(&self, label: L) -> &M {
|
||||
// safety: The caller has guarantees that the label encoding is valid.
|
||||
unsafe { self.metrics.get_unchecked(label.encode()) }
|
||||
}
|
||||
|
||||
/// Get the individual metric at the given identifier.
|
||||
///
|
||||
/// # Panics
|
||||
/// Can panic or cause strange behaviour if the label ID comes from a different metric family.
|
||||
pub fn get_metric_mut(&mut self, label: L) -> &mut M {
|
||||
// safety: The caller has guarantees that the label encoding is valid.
|
||||
unsafe { self.metrics.get_unchecked_mut(label.encode()) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MetricEncoding<T>, L: FixedCardinalityLabel + LabelGroup, T: Encoding>
|
||||
MetricFamilyEncoding<T> for DenseMetricVec<M, L>
|
||||
{
|
||||
fn collect_family_into(&self, name: impl MetricNameEncoder, enc: &mut T) -> Result<(), T::Err> {
|
||||
M::write_type(&name, enc)?;
|
||||
for (index, value) in self.metrics.iter().enumerate() {
|
||||
value.collect_into(&self.metadata, L::decode(index), &name, enc)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -478,7 +478,7 @@ pub trait CounterPairAssoc {
|
||||
}
|
||||
|
||||
pub struct CounterPairVec<A: CounterPairAssoc> {
|
||||
vec: measured::metric::MetricVec<MeasuredCounterPairState, A::LabelGroupSet>,
|
||||
pub vec: measured::metric::MetricVec<MeasuredCounterPairState, A::LabelGroupSet>,
|
||||
}
|
||||
|
||||
impl<A: CounterPairAssoc> Default for CounterPairVec<A>
|
||||
@@ -492,6 +492,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: CounterPairAssoc> CounterPairVec<A>
|
||||
where
|
||||
A::LabelGroupSet: Default,
|
||||
{
|
||||
pub fn dense() -> Self {
|
||||
Self {
|
||||
vec: measured::metric::MetricVec::dense(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: CounterPairAssoc> CounterPairVec<A> {
|
||||
pub fn guard(
|
||||
&self,
|
||||
@@ -501,14 +512,31 @@ impl<A: CounterPairAssoc> CounterPairVec<A> {
|
||||
self.vec.get_metric(id).inc.inc();
|
||||
MeasuredCounterPairGuard { vec: &self.vec, id }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn inc(&self, labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>) {
|
||||
let id = self.vec.with_labels(labels);
|
||||
self.vec.get_metric(id).inc.inc();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn dec(&self, labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>) {
|
||||
let id = self.vec.with_labels(labels);
|
||||
self.vec.get_metric(id).dec.inc();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn inc_by(&self, labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>, x: u64) {
|
||||
let id = self.vec.with_labels(labels);
|
||||
self.vec.get_metric(id).inc.inc_by(x);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn dec_by(&self, labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>, x: u64) {
|
||||
let id = self.vec.with_labels(labels);
|
||||
self.vec.get_metric(id).dec.inc_by(x);
|
||||
}
|
||||
|
||||
pub fn remove_metric(
|
||||
&self,
|
||||
labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>,
|
||||
@@ -553,6 +581,28 @@ pub struct MeasuredCounterPairState {
|
||||
pub dec: CounterState,
|
||||
}
|
||||
|
||||
impl MeasuredCounterPairState {
|
||||
#[inline]
|
||||
pub fn inc(&self) {
|
||||
self.inc.inc();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn dec(&self) {
|
||||
self.dec.inc();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn inc_by(&self, x: u64) {
|
||||
self.inc.inc_by(x);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn dec_by(&self, x: u64) {
|
||||
self.dec.inc_by(x);
|
||||
}
|
||||
}
|
||||
|
||||
impl measured::metric::MetricType for MeasuredCounterPairState {
|
||||
type Metadata = ();
|
||||
}
|
||||
@@ -569,9 +619,9 @@ impl<A: CounterPairAssoc> Drop for MeasuredCounterPairGuard<'_, A> {
|
||||
}
|
||||
|
||||
/// [`MetricEncoding`] for [`MeasuredCounterPairState`] that only writes the inc counter to the inner encoder.
|
||||
struct Inc<T>(T);
|
||||
pub struct Inc<T>(pub T);
|
||||
/// [`MetricEncoding`] for [`MeasuredCounterPairState`] that only writes the dec counter to the inner encoder.
|
||||
struct Dec<T>(T);
|
||||
pub struct Dec<T>(pub T);
|
||||
|
||||
impl<T: Encoding> Encoding for Inc<T> {
|
||||
type Err = T::Err;
|
||||
|
||||
@@ -10,6 +10,7 @@ testing = ["dep:tokio-postgres"]
|
||||
|
||||
[dependencies]
|
||||
ahash.workspace = true
|
||||
alloc-metrics.workspace = true
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
use alloc_metrics::TrackedAllocator;
|
||||
use proxy::binary::proxy::MemoryContext;
|
||||
use tikv_jemallocator::Jemalloc;
|
||||
|
||||
#[global_allocator]
|
||||
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||
// Safety: `MemoryContext` upholds the safety requirements.
|
||||
static GLOBAL: TrackedAllocator<Jemalloc, MemoryContext> =
|
||||
unsafe { TrackedAllocator::new(Jemalloc, MemoryContext::Root) };
|
||||
|
||||
#[allow(non_upper_case_globals)]
|
||||
#[unsafe(export_name = "malloc_conf")]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
proxy::binary::proxy::run().await
|
||||
fn main() -> anyhow::Result<()> {
|
||||
GLOBAL.register_thread();
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.on_thread_start(|| GLOBAL.register_thread())
|
||||
.build()
|
||||
.expect("Failed building the Runtime")
|
||||
.block_on(proxy::binary::proxy::run(&GLOBAL))
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
Metrics::install(Arc::new(ThreadPoolMetrics::new(0)));
|
||||
Metrics::install(Arc::new(ThreadPoolMetrics::new(0)), None);
|
||||
|
||||
// TODO: refactor these to use labels
|
||||
debug!("Version: {GIT_VERSION}");
|
||||
|
||||
@@ -80,7 +80,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
Metrics::install(Arc::new(ThreadPoolMetrics::new(0)));
|
||||
Metrics::install(Arc::new(ThreadPoolMetrics::new(0)), None);
|
||||
|
||||
let args = cli().get_matches();
|
||||
let destination: String = args
|
||||
|
||||
@@ -39,7 +39,8 @@ use crate::config::{
|
||||
};
|
||||
use crate::context::parquet::ParquetUploadArgs;
|
||||
use crate::http::health_server::AppMetrics;
|
||||
use crate::metrics::Metrics;
|
||||
pub use crate::metrics::MemoryContext;
|
||||
use crate::metrics::{Alloc, Metrics};
|
||||
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
|
||||
use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use crate::redis::kv_ops::RedisKVClient;
|
||||
@@ -318,7 +319,7 @@ struct PgSniRouterArgs {
|
||||
dest: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn run() -> anyhow::Result<()> {
|
||||
pub async fn run(alloc: &'static Alloc) -> anyhow::Result<()> {
|
||||
let _logging_guard = crate::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
@@ -340,7 +341,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let args = ProxyCliArgs::parse();
|
||||
let config = build_config(&args)?;
|
||||
let config = build_config(&args, alloc)?;
|
||||
let auth_backend = build_auth_backend(&args)?;
|
||||
|
||||
match auth_backend {
|
||||
@@ -589,9 +590,12 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
/// ProxyConfig is created at proxy startup, and lives forever.
|
||||
fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
fn build_config(
|
||||
args: &ProxyCliArgs,
|
||||
alloc: &'static Alloc,
|
||||
) -> anyhow::Result<&'static ProxyConfig> {
|
||||
let thread_pool = ThreadPool::new(args.scram_thread_pool_size);
|
||||
Metrics::install(thread_pool.metrics.clone());
|
||||
Metrics::install(thread_pool.metrics.clone(), Some(alloc));
|
||||
|
||||
let tls_config = match (&args.tls_key, &args.tls_cert) {
|
||||
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
#![expect(
|
||||
clippy::ref_option_ref,
|
||||
reason = "generated from measured derived output"
|
||||
)]
|
||||
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use alloc_metrics::TrackedAllocator;
|
||||
use lasso::ThreadedRodeo;
|
||||
use measured::label::{
|
||||
FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
|
||||
@@ -11,26 +17,33 @@ use measured::{
|
||||
MetricGroup,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
|
||||
use tikv_jemallocator::Jemalloc;
|
||||
use tokio::time::{self, Instant};
|
||||
|
||||
use crate::control_plane::messages::ColdStartInfo;
|
||||
use crate::error::ErrorKind;
|
||||
|
||||
pub type Alloc = TrackedAllocator<Jemalloc, MemoryContext>;
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
|
||||
#[metric(new(thread_pool: Arc<ThreadPoolMetrics>, alloc: Option<&'static Alloc>))]
|
||||
pub struct Metrics {
|
||||
#[metric(namespace = "proxy")]
|
||||
#[metric(init = ProxyMetrics::new(thread_pool))]
|
||||
pub proxy: ProxyMetrics,
|
||||
|
||||
#[metric(namespace = "alloc")]
|
||||
#[metric(init = alloc)]
|
||||
pub alloc: Option<&'static Alloc>,
|
||||
|
||||
#[metric(namespace = "wake_compute_lock")]
|
||||
pub wake_compute_lock: ApiLockMetrics,
|
||||
}
|
||||
|
||||
static SELF: OnceLock<Metrics> = OnceLock::new();
|
||||
impl Metrics {
|
||||
pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
|
||||
let mut metrics = Metrics::new(thread_pool);
|
||||
pub fn install(thread_pool: Arc<ThreadPoolMetrics>, alloc: Option<&'static Alloc>) {
|
||||
let mut metrics = Metrics::new(thread_pool, alloc);
|
||||
|
||||
metrics.proxy.errors_total.init_all_dense();
|
||||
metrics.proxy.redis_errors_total.init_all_dense();
|
||||
@@ -45,7 +58,7 @@ impl Metrics {
|
||||
|
||||
pub fn get() -> &'static Self {
|
||||
#[cfg(test)]
|
||||
return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
|
||||
return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0)), None));
|
||||
|
||||
#[cfg(not(test))]
|
||||
SELF.get()
|
||||
@@ -660,3 +673,9 @@ pub struct ThreadPoolMetrics {
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
#[label(singleton = "context")]
|
||||
pub enum MemoryContext {
|
||||
Root,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user