From 1c5477619fa3269526bcad49b5f14e12becf3f85 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sun, 20 Jul 2025 19:37:37 +0100 Subject: [PATCH] focus on optimisations --- Cargo.lock | 2 + libs/alloc-metrics/Cargo.toml | 8 + libs/alloc-metrics/benches/alloc.rs | 107 +++++++++++ libs/alloc-metrics/src/lib.rs | 91 ++++++---- libs/alloc-metrics/src/metric_vec.rs | 260 +++++++++++++++++++++++++++ libs/metrics/src/lib.rs | 8 + 6 files changed, 437 insertions(+), 39 deletions(-) create mode 100644 libs/alloc-metrics/benches/alloc.rs create mode 100644 libs/alloc-metrics/src/metric_vec.rs diff --git a/Cargo.lock b/Cargo.lock index 8d5b288f67..f22732e4d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,8 +65,10 @@ dependencies = [ name = "alloc-metrics" version = "0.1.0" dependencies = [ + "criterion", "measured", "metrics", + "tikv-jemallocator", ] [[package]] diff --git a/libs/alloc-metrics/Cargo.toml b/libs/alloc-metrics/Cargo.toml index 96e0ded8d8..c682ec4e43 100644 --- a/libs/alloc-metrics/Cargo.toml +++ b/libs/alloc-metrics/Cargo.toml @@ -7,3 +7,11 @@ license.workspace = true [dependencies] metrics.workspace = true measured.workspace = true + +[dev-dependencies] +criterion.workspace = true +tikv-jemallocator.workspace = true + +[[bench]] +harness = false +name = "alloc" diff --git a/libs/alloc-metrics/benches/alloc.rs b/libs/alloc-metrics/benches/alloc.rs new file mode 100644 index 0000000000..75e7822be1 --- /dev/null +++ b/libs/alloc-metrics/benches/alloc.rs @@ -0,0 +1,107 @@ +use std::alloc::{GlobalAlloc, Layout, System, handle_alloc_error}; + +use alloc_metrics::TrackedAllocator; +use criterion::{ + AxisScale, BatchSize, BenchmarkId as Id, Criterion, PlotConfiguration, Throughput, + criterion_group, criterion_main, +}; +use measured::FixedCardinalityLabel; +use tikv_jemallocator::Jemalloc; + +criterion_group!(benches, bench_alloc); +criterion_main!(benches); + +struct Alloc<'a, A: GlobalAlloc> { + alloc: &'a A, + ptr: *mut u8, + layout: Layout, +} + +impl<'a, A: GlobalAlloc> Alloc<'a, A> { + fn new(alloc: &'a A, layout: Layout) -> Self { + let ptr = unsafe { alloc.alloc(layout) }; + if ptr.is_null() { + handle_alloc_error(layout); + } + + // actually make the page resident. + unsafe { ptr.cast::().write(1) }; + + Self { alloc, ptr, layout } + } +} + +impl<'a, A: GlobalAlloc> Drop for Alloc<'a, A> { + fn drop(&mut self) { + unsafe { self.alloc.dealloc(self.ptr, self.layout) }; + } +} + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug)] +#[label(singleton = "memory_context")] +pub enum MemoryContext { + Root, + Test, +} + +static ALLOC_SYSTEM: TrackedAllocator = + unsafe { TrackedAllocator::new(System, MemoryContext::Root) }; +static ALLOC_JEMALLOC: TrackedAllocator = + unsafe { TrackedAllocator::new(Jemalloc, MemoryContext::Root) }; + +fn bench_alloc(c: &mut Criterion) { + const KB: u64 = 1024; + let sizes = [64, 256, KB, 4 * KB, 16 * KB, KB * KB]; + + let mut g = c.benchmark_group("alloc"); + g.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + for size in sizes { + g.throughput(Throughput::Bytes(size)); + + let layout = Layout::from_size_align(size as usize, 8).unwrap(); + + let bs = BatchSize::NumBatches(10 + size.ilog2() as u64); + + g.bench_with_input(Id::new("system", size), &layout, |b, layout| { + b.iter_batched(|| {}, |()| Alloc::new(&System, *layout), bs); + }); + g.bench_with_input(Id::new("tracked[system]", size), &layout, |b, layout| { + let _scope = ALLOC_SYSTEM.scope(MemoryContext::Test); + b.iter_batched(|| {}, |()| Alloc::new(&ALLOC_SYSTEM, *layout), bs); + }); + g.bench_with_input(Id::new("jemalloc", size), &layout, |b, layout| { + b.iter_batched(|| {}, |()| Alloc::new(&Jemalloc, *layout), bs); + }); + g.bench_with_input(Id::new("tracked[jemalloc]", size), &layout, |b, layout| { + let _scope = ALLOC_JEMALLOC.scope(MemoryContext::Test); + b.iter_batched(|| {}, |()| Alloc::new(&ALLOC_JEMALLOC, *layout), bs); + }); + } + g.finish(); + + let mut g = c.benchmark_group("dealloc"); + g.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + for size in sizes { + g.throughput(Throughput::Bytes(size)); + + let layout = Layout::from_size_align(size as usize, 8).unwrap(); + + let bs = BatchSize::NumBatches(10 + size.ilog2() as u64); + + g.bench_with_input(Id::new("system", size), &layout, |b, layout| { + b.iter_batched(|| Alloc::new(&System, *layout), drop, bs) + }); + g.bench_with_input(Id::new("tracked[system]", size), &layout, |b, layout| { + let _scope = ALLOC_SYSTEM.scope(MemoryContext::Test); + b.iter_batched(|| Alloc::new(&ALLOC_SYSTEM, *layout), drop, bs) + }); + g.bench_with_input(Id::new("jemalloc", size), &layout, |b, layout| { + b.iter_batched(|| Alloc::new(&Jemalloc, *layout), drop, bs) + }); + g.bench_with_input(Id::new("tracked[jemalloc]", size), &layout, |b, layout| { + let _scope = ALLOC_JEMALLOC.scope(MemoryContext::Test); + b.iter_batched(|| Alloc::new(&ALLOC_JEMALLOC, *layout), drop, bs) + }); + } + g.finish(); +} diff --git a/libs/alloc-metrics/src/lib.rs b/libs/alloc-metrics/src/lib.rs index 21725f7816..3dde4a11a1 100644 --- a/libs/alloc-metrics/src/lib.rs +++ b/libs/alloc-metrics/src/lib.rs @@ -1,5 +1,6 @@ //! Tagged allocator measurements. +mod metric_vec; mod thread_local; use std::{ @@ -17,10 +18,12 @@ use measured::{ label::StaticLabelSet, metric::{MetricEncoding, counter::CounterState, group::Encoding, name::MetricName}, }; -use metrics::{CounterPairAssoc, CounterPairVec, MeasuredCounterPairState}; +use metrics::{CounterPairAssoc, MeasuredCounterPairState}; use thread_local::ThreadLocal; -type AllocCounter = CounterPairVec>; +use crate::metric_vec::DenseCounterPairVec; + +type AllocCounter = DenseCounterPairVec, T>; pub struct TrackedAllocator { inner: A, @@ -79,8 +82,8 @@ where self.thread_state .get_or_init(ThreadLocal::new) .get_or(|| ThreadState { - counters: CounterPairVec::dense(), - global: self.global.get_or_init(CounterPairVec::dense), + counters: DenseCounterPairVec::default(), + global: self.global.get_or_init(DenseCounterPairVec::default), }); self.thread_scope @@ -130,12 +133,16 @@ where // Safety: tag_offset is inbounds of the ptr unsafe { ptr.add(tag_offset).cast::().write(tag) } - if let Some(counters) = self.current_counters_alloc_safe() { - // During `Self::new`, the caller has guaranteed that tag encoding will not panic. - counters.inc_by(tag, layout.size() as u64); + let metric = if let Some(counters) = self.current_counters_alloc_safe() { + // safety: caller ensured that is implemented correctly. + let id = unsafe { counters.vec.try_with_labels(tag).unwrap_unchecked() }; + counters.vec.get_metric(id) } else { - self.default_counters.inc_by(layout.size() as u64); - } + // if tag is not default, then global would have been registered, therefore tag must be default. + &self.default_counters + }; + + metric.inc_by(layout.size() as u64); ptr } @@ -203,27 +210,30 @@ where // Safety: new_tag_offset is inbounds of the ptr unsafe { new_ptr.add(new_tag_offset).cast::().write(new_tag) } - if let Some(counters) = self.current_counters_alloc_safe() { - if tag.encode() == new_tag.encode() { - let diff = usize::abs_diff(new_layout.size(), layout.size()) as u64; - if new_layout.size() > layout.size() { - counters.inc_by(tag, diff); - } else { - counters.dec_by(tag, diff); - } - } else { - counters.inc_by(new_tag, new_layout.size() as u64); - counters.dec_by(tag, layout.size() as u64); - } + let (new_metric, old_metric) = if let Some(counters) = self.current_counters_alloc_safe() { + // safety: caller ensured that is implemented correctly. + let new_id = unsafe { counters.vec.try_with_labels(new_tag).unwrap_unchecked() }; + // safety: caller ensured that is implemented correctly. + let old_id = unsafe { counters.vec.try_with_labels(tag).unwrap_unchecked() }; + let new_metric = counters.vec.get_metric(new_id); + let old_metric = counters.vec.get_metric(old_id); + + (new_metric, old_metric) } else { // no tag was registered at all, therefore both tags must be default. - let diff = usize::abs_diff(new_layout.size(), layout.size()) as u64; - if new_layout.size() > layout.size() { - self.default_counters.inc_by(diff); - } else { - self.default_counters.dec_by(diff); - } - } + (&self.default_counters, &self.default_counters) + }; + + let (inc, dec) = if tag.encode() != new_tag.encode() { + (new_layout.size() as u64, layout.size() as u64) + } else if new_layout.size() > layout.size() { + ((new_layout.size() - layout.size()) as u64, 0) + } else { + (0, (layout.size() - new_layout.size()) as u64) + }; + + new_metric.inc.inc_by(inc); + old_metric.dec.inc_by(dec); new_ptr } @@ -240,16 +250,19 @@ where // Safety: tag_offset is inbounds of the ptr let tag = unsafe { ptr.add(tag_offset).cast::().read() }; - if let Some(counters) = self.current_counters_alloc_safe() { - counters.dec_by(tag, layout.size() as u64); - } else { - // if tag is not default, then global would have been registered, - // therefore tag must be default. - self.default_counters.dec_by(layout.size() as u64); - } - // Safety: caller upholds contract for us unsafe { self.inner.dealloc(ptr, tagged_layout) } + + let metric = if let Some(counters) = self.current_counters_alloc_safe() { + // safety: caller ensured that is implemented correctly. + let id = unsafe { counters.vec.try_with_labels(tag).unwrap_unchecked() }; + counters.vec.get_metric(id) + } else { + // if tag is not default, then global would have been registered, therefore tag must be default. + &self.default_counters + }; + + metric.dec_by(layout.size() as u64); } } @@ -288,7 +301,7 @@ impl Drop for ThreadState { for tag in (0..T::cardinality()).map(T::decode) { // load and reset the counts in the thread-local counters. let id = self.counters.vec.with_labels(tag); - let mut m = self.counters.vec.get_metric_mut(id); + let m = self.counters.vec.get_metric_mut(id); let inc = *m.inc.count.get_mut(); let dec = *m.dec.count.get_mut(); @@ -308,14 +321,14 @@ where CounterState: MetricEncoding, { fn collect_group_into(&self, enc: &mut Enc) -> Result<(), Enc::Err> { - let global = self.global.get_or_init(CounterPairVec::dense); + let global = self.global.get_or_init(DenseCounterPairVec::default); // iterate over all counter threads for s in self.thread_state.get().into_iter().flat_map(|s| s.iter()) { // iterate over all labels for tag in (0..T::cardinality()).map(T::decode) { let id = s.counters.vec.with_labels(tag); - sample(global, &s.counters.vec.get_metric(id), tag); + sample(global, s.counters.vec.get_metric(id), tag); } } diff --git a/libs/alloc-metrics/src/metric_vec.rs b/libs/alloc-metrics/src/metric_vec.rs new file mode 100644 index 0000000000..84fd928ce4 --- /dev/null +++ b/libs/alloc-metrics/src/metric_vec.rs @@ -0,0 +1,260 @@ +//! Dense metric vec + +use std::marker::PhantomData; + +use measured::{ + FixedCardinalityLabel, LabelGroup, + label::{LabelGroupSet, StaticLabelSet}, + metric::{ + MetricEncoding, MetricFamilyEncoding, MetricType, counter::CounterState, group::Encoding, + name::MetricNameEncoder, + }, +}; +use metrics::{CounterPairAssoc, MeasuredCounterPairState}; + +pub struct DenseMetricVec { + metrics: VecInner, + metadata: M::Metadata, + label_set: StaticLabelSet, +} + +enum VecInner { + Dense(Box<[M]>), +} + +fn new_dense(c: usize) -> Box<[M]> { + let mut vec = Vec::with_capacity(c); + vec.resize_with(c, M::default); + vec.into_boxed_slice() +} + +impl DenseMetricVec +where + M::Metadata: Default, +{ + /// Create a new metric vec with the given label set and metric metadata + pub fn new() -> Self { + Self::with_metadata(::default()) + } +} + +impl Default for DenseMetricVec +where + M::Metadata: Default, +{ + fn default() -> Self { + Self::new() + } +} + +impl VecInner { + fn get_metric(&self, id: usize) -> &M { + match self { + VecInner::Dense(metrics) => &metrics[id], + } + } + + fn get_metric_mut(&mut self, id: usize) -> &mut M { + match self { + VecInner::Dense(metrics) => &mut metrics[id], + } + } +} + +impl DenseMetricVec { + /// Create a new metric vec with the given label set and metric metadata + pub fn with_metadata(metadata: M::Metadata) -> Self { + let metrics = VecInner::Dense(new_dense(L::cardinality())); + + Self { + metrics, + metadata, + label_set: StaticLabelSet::new(), + } + } + + // /// View the metric metadata + // pub fn metadata(&self) -> &M::Metadata { + // &self.metadata + // } + + /// Get an identifier for the specific metric identified by this label group + /// + /// # Panics + /// Panics if the label group is not contained within the label set. + pub fn with_labels(&self, label: L) -> usize { + self.try_with_labels(label) + .expect("label group was not contained within this label set") + } + + /// Get an identifier for the specific metric identified by this label group + /// + /// # Errors + /// Returns None if the label group is not contained within the label set. + pub fn try_with_labels(&self, label: L) -> Option { + self.label_set.encode(label) + } + + /// Get the individual metric at the given identifier. + /// + /// # Panics + /// Can panic or cause strange behaviour if the label ID comes from a different metric family. + pub fn get_metric(&self, id: usize) -> &M { + self.metrics.get_metric(id) + } + + /// Get the individual metric at the given identifier. + /// + /// # Panics + /// Can panic or cause strange behaviour if the label ID comes from a different metric family. + pub fn get_metric_mut(&mut self, id: usize) -> &mut M { + self.metrics.get_metric_mut(id) + } +} + +impl, L: FixedCardinalityLabel + LabelGroup, T: Encoding> + MetricFamilyEncoding for DenseMetricVec +{ + fn collect_family_into(&self, name: impl MetricNameEncoder, enc: &mut T) -> Result<(), T::Err> { + M::write_type(&name, enc)?; + match &self.metrics { + VecInner::Dense(m) => { + for (index, value) in m.iter().enumerate() { + value.collect_into( + &self.metadata, + self.label_set.decode_dense(index), + &name, + enc, + )?; + } + } + } + Ok(()) + } +} + +pub struct DenseCounterPairVec< + A: CounterPairAssoc>, + L: FixedCardinalityLabel + LabelGroup, +> { + pub vec: DenseMetricVec, + pub _marker: PhantomData, +} + +impl>, L: FixedCardinalityLabel + LabelGroup> + Default for DenseCounterPairVec +{ + fn default() -> Self { + Self { + vec: DenseMetricVec::new(), + _marker: PhantomData, + } + } +} + +// impl>, L: FixedCardinalityLabel + LabelGroup> +// DenseCounterPairVec +// { +// #[inline] +// pub fn inc(&self, labels: ::Group<'_>) { +// let id = self.vec.with_labels(labels); +// self.vec.get_metric(id).inc.inc(); +// } + +// #[inline] +// pub fn dec(&self, labels: ::Group<'_>) { +// let id = self.vec.with_labels(labels); +// self.vec.get_metric(id).dec.inc(); +// } + +// #[inline] +// pub fn inc_by(&self, labels: ::Group<'_>, x: u64) { +// let id = self.vec.with_labels(labels); +// self.vec.get_metric(id).inc.inc_by(x); +// } + +// #[inline] +// pub fn dec_by(&self, labels: ::Group<'_>, x: u64) { +// let id = self.vec.with_labels(labels); +// self.vec.get_metric(id).dec.inc_by(x); +// } +// } + +impl ::measured::metric::group::MetricGroup for DenseCounterPairVec +where + T: ::measured::metric::group::Encoding, + ::measured::metric::counter::CounterState: ::measured::metric::MetricEncoding, + A: CounterPairAssoc>, + L: FixedCardinalityLabel + LabelGroup, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> { + // write decrement first to avoid a race condition where inc - dec < 0 + T::write_help(enc, A::DEC_NAME, A::DEC_HELP)?; + self.vec + .collect_family_into(A::DEC_NAME, &mut Dec(&mut *enc))?; + + T::write_help(enc, A::INC_NAME, A::INC_HELP)?; + self.vec + .collect_family_into(A::INC_NAME, &mut Inc(&mut *enc))?; + + Ok(()) + } +} + +/// [`MetricEncoding`] for [`MeasuredCounterPairState`] that only writes the inc counter to the inner encoder. +struct Inc(T); +/// [`MetricEncoding`] for [`MeasuredCounterPairState`] that only writes the dec counter to the inner encoder. +struct Dec(T); + +impl Encoding for Inc { + type Err = T::Err; + + fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> { + self.0.write_help(name, help) + } +} + +impl MetricEncoding> for MeasuredCounterPairState +where + CounterState: MetricEncoding, +{ + fn write_type(name: impl MetricNameEncoder, enc: &mut Inc) -> Result<(), T::Err> { + CounterState::write_type(name, &mut enc.0) + } + fn collect_into( + &self, + metadata: &(), + labels: impl LabelGroup, + name: impl MetricNameEncoder, + enc: &mut Inc, + ) -> Result<(), T::Err> { + self.inc.collect_into(metadata, labels, name, &mut enc.0) + } +} + +impl Encoding for Dec { + type Err = T::Err; + + fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> { + self.0.write_help(name, help) + } +} + +/// Write the dec counter to the encoder +impl MetricEncoding> for MeasuredCounterPairState +where + CounterState: MetricEncoding, +{ + fn write_type(name: impl MetricNameEncoder, enc: &mut Dec) -> Result<(), T::Err> { + CounterState::write_type(name, &mut enc.0) + } + fn collect_into( + &self, + metadata: &(), + labels: impl LabelGroup, + name: impl MetricNameEncoder, + enc: &mut Dec, + ) -> Result<(), T::Err> { + self.dec.collect_into(metadata, labels, name, &mut enc.0) + } +} diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 517577a6c3..621d1c6e0a 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -513,21 +513,25 @@ impl CounterPairVec { MeasuredCounterPairGuard { vec: &self.vec, id } } + #[inline] pub fn inc(&self, labels: ::Group<'_>) { let id = self.vec.with_labels(labels); self.vec.get_metric(id).inc.inc(); } + #[inline] pub fn dec(&self, labels: ::Group<'_>) { let id = self.vec.with_labels(labels); self.vec.get_metric(id).dec.inc(); } + #[inline] pub fn inc_by(&self, labels: ::Group<'_>, x: u64) { let id = self.vec.with_labels(labels); self.vec.get_metric(id).inc.inc_by(x); } + #[inline] pub fn dec_by(&self, labels: ::Group<'_>, x: u64) { let id = self.vec.with_labels(labels); self.vec.get_metric(id).dec.inc_by(x); @@ -578,18 +582,22 @@ pub struct MeasuredCounterPairState { } impl MeasuredCounterPairState { + #[inline] pub fn inc(&self) { self.inc.inc(); } + #[inline] pub fn dec(&self) { self.dec.inc(); } + #[inline] pub fn inc_by(&self, x: u64) { self.inc.inc_by(x); } + #[inline] pub fn dec_by(&self, x: u64) { self.dec.inc_by(x); }