Compare commits

...

3 Commits

Author SHA1 Message Date
John Spray
f55021ee76 pageserver: fix noisy logging on tenant NotFound 2024-01-30 15:18:16 +00:00
Joonas Koivunen
1e9a50bca8 disk_usage_eviction_task: cleanup summaries (#6490)
This is the "partial revert" of #6384. The summaries turned out to be
expensive due to naive vec usage, but also inconclusive because of the
additional context required. In addition to removing summary traces,
small refactoring is done.
2024-01-29 10:38:40 +02:00
Conrad Ludgate
511e730cc0 hll experiment (#6312)
## Problem

Measuring cardinality using logs is expensive and slow.

## Summary of changes

Implement a pre-aggregated HyperLogLog-based cardinality estimate.
HyperLogLog estimates the cardinality of a set by using the probability
that the uniform hash of a value will have a run of n 0s at the end is
`1/2^n`, therefore, having observed a run of `n` 0s suggests we have
measured `2^n` distinct values. By using multiple shards, we can use the
harmonic mean to get a more accurate estimate.

We record this into a Prometheus time-series. HyperLogLog counts can be
merged by taking the `max` of each shard. We can apply a `max_over_time`
in order to find the estimate of cardinality of distinct values over
time
2024-01-29 07:26:20 +00:00
10 changed files with 702 additions and 242 deletions

20
Cargo.lock generated
View File

@@ -2736,6 +2736,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "libm"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "linux-raw-sys"
version = "0.1.4"
@@ -2832,6 +2838,9 @@ dependencies = [
"libc",
"once_cell",
"prometheus",
"rand 0.8.5",
"rand_distr",
"twox-hash",
"workspace_hack",
]
@@ -3057,6 +3066,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
"libm",
]
[[package]]
@@ -4171,6 +4181,16 @@ dependencies = [
"getrandom 0.2.11",
]
[[package]]
name = "rand_distr"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
dependencies = [
"num-traits",
"rand 0.8.5",
]
[[package]]
name = "rand_hc"
version = "0.2.0"

View File

@@ -165,6 +165,7 @@ tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.20.0"
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
twox-hash = { version = "1.6.3", default-features = false }
url = "2.2"
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"

View File

@@ -9,5 +9,10 @@ prometheus.workspace = true
libc.workspace = true
once_cell.workspace = true
chrono.workspace = true
twox-hash.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
rand = "0.8"
rand_distr = "0.4.3"

523
libs/metrics/src/hll.rs Normal file
View File

@@ -0,0 +1,523 @@
//! HyperLogLog is an algorithm for the count-distinct problem,
//! approximating the number of distinct elements in a multiset.
//! Calculating the exact cardinality of the distinct elements
//! of a multiset requires an amount of memory proportional to
//! the cardinality, which is impractical for very large data sets.
//! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
//! use significantly less memory than this, but can only approximate the cardinality.
use std::{
collections::HashMap,
hash::{BuildHasher, BuildHasherDefault, Hash, Hasher},
sync::{atomic::AtomicU8, Arc, RwLock},
};
use prometheus::{
core::{self, Describer},
proto, Opts,
};
use twox_hash::xxh3;
/// Create an [`HyperLogLogVec`] and registers to default registry.
#[macro_export(local_inner_macros)]
macro_rules! register_hll_vec {
($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
$crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
$crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
}};
}
/// Create an [`HyperLogLog`] and registers to default registry.
#[macro_export(local_inner_macros)]
macro_rules! register_hll {
($N:literal, $OPTS:expr $(,)?) => {{
let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
$crate::register(Box::new(hll.clone())).map(|_| hll)
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
$crate::register_hll!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
}};
}
/// HLL is a probabilistic cardinality measure.
///
/// How to use this time-series for a metric name `my_metrics_total_hll`:
///
/// ```promql
/// # harmonic mean
/// 1 / (
/// sum (
/// 2 ^ -(
/// # HLL merge operation
/// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
/// )
/// ) without (hll_shard)
/// )
/// * alpha
/// * shards_count
/// * shards_count
/// ```
///
/// If you want an estimate over time, you can use the following query:
///
/// ```promql
/// # harmonic mean
/// 1 / (
/// sum (
/// 2 ^ -(
/// # HLL merge operation
/// max (
/// max_over_time(my_metrics_total_hll{}[$__rate_interval])
/// ) by (hll_shard, other_labels...)
/// )
/// ) without (hll_shard)
/// )
/// * alpha
/// * shards_count
/// * shards_count
/// ```
///
/// In the case of low cardinality, you might want to use the linear counting approximation:
///
/// ```promql
/// # LinearCounting(m, V) = m log (m / V)
/// shards_count * ln(shards_count /
/// # calculate V = how many shards contain a 0
/// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
/// )
/// ```
///
/// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
#[derive(Clone)]
pub struct HyperLogLogVec<const N: usize> {
core: Arc<HyperLogLogVecCore<N>>,
}
struct HyperLogLogVecCore<const N: usize> {
pub children: RwLock<HashMap<u64, HyperLogLog<N>, BuildHasherDefault<xxh3::Hash64>>>,
pub desc: core::Desc,
pub opts: Opts,
}
impl<const N: usize> core::Collector for HyperLogLogVec<N> {
fn desc(&self) -> Vec<&core::Desc> {
vec![&self.core.desc]
}
fn collect(&self) -> Vec<proto::MetricFamily> {
let mut m = proto::MetricFamily::default();
m.set_name(self.core.desc.fq_name.clone());
m.set_help(self.core.desc.help.clone());
m.set_field_type(proto::MetricType::GAUGE);
let mut metrics = Vec::new();
for child in self.core.children.read().unwrap().values() {
child.core.collect_into(&mut metrics);
}
m.set_metric(metrics);
vec![m]
}
}
impl<const N: usize> HyperLogLogVec<N> {
/// Create a new [`HyperLogLogVec`] based on the provided
/// [`Opts`] and partitioned by the given label names. At least one label name must be
/// provided.
pub fn new(opts: Opts, label_names: &[&str]) -> prometheus::Result<Self> {
assert!(N.is_power_of_two());
let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
let opts = opts.variable_labels(variable_names);
let desc = opts.describe()?;
let v = HyperLogLogVecCore {
children: RwLock::new(HashMap::default()),
desc,
opts,
};
Ok(Self { core: Arc::new(v) })
}
/// `get_metric_with_label_values` returns the [`HyperLogLog<P>`] for the given slice
/// of label values (same order as the VariableLabels in Desc). If that combination of
/// label values is accessed for the first time, a new [`HyperLogLog<P>`] is created.
///
/// An error is returned if the number of label values is not the same as the
/// number of VariableLabels in Desc.
pub fn get_metric_with_label_values(
&self,
vals: &[&str],
) -> prometheus::Result<HyperLogLog<N>> {
self.core.get_metric_with_label_values(vals)
}
/// `with_label_values` works as `get_metric_with_label_values`, but panics if an error
/// occurs.
pub fn with_label_values(&self, vals: &[&str]) -> HyperLogLog<N> {
self.get_metric_with_label_values(vals).unwrap()
}
}
impl<const N: usize> HyperLogLogVecCore<N> {
pub fn get_metric_with_label_values(
&self,
vals: &[&str],
) -> prometheus::Result<HyperLogLog<N>> {
let h = self.hash_label_values(vals)?;
if let Some(metric) = self.children.read().unwrap().get(&h).cloned() {
return Ok(metric);
}
self.get_or_create_metric(h, vals)
}
pub(crate) fn hash_label_values(&self, vals: &[&str]) -> prometheus::Result<u64> {
if vals.len() != self.desc.variable_labels.len() {
return Err(prometheus::Error::InconsistentCardinality {
expect: self.desc.variable_labels.len(),
got: vals.len(),
});
}
let mut h = xxh3::Hash64::default();
for val in vals {
h.write(val.as_bytes());
}
Ok(h.finish())
}
fn get_or_create_metric(
&self,
hash: u64,
label_values: &[&str],
) -> prometheus::Result<HyperLogLog<N>> {
let mut children = self.children.write().unwrap();
// Check exist first.
if let Some(metric) = children.get(&hash).cloned() {
return Ok(metric);
}
let metric = HyperLogLog::with_opts_and_label_values(&self.opts, label_values)?;
children.insert(hash, metric.clone());
Ok(metric)
}
}
/// HLL is a probabilistic cardinality measure.
///
/// How to use this time-series for a metric name `my_metrics_total_hll`:
///
/// ```promql
/// # harmonic mean
/// 1 / (
/// sum (
/// 2 ^ -(
/// # HLL merge operation
/// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
/// )
/// ) without (hll_shard)
/// )
/// * alpha
/// * shards_count
/// * shards_count
/// ```
///
/// If you want an estimate over time, you can use the following query:
///
/// ```promql
/// # harmonic mean
/// 1 / (
/// sum (
/// 2 ^ -(
/// # HLL merge operation
/// max (
/// max_over_time(my_metrics_total_hll{}[$__rate_interval])
/// ) by (hll_shard, other_labels...)
/// )
/// ) without (hll_shard)
/// )
/// * alpha
/// * shards_count
/// * shards_count
/// ```
///
/// In the case of low cardinality, you might want to use the linear counting approximation:
///
/// ```promql
/// # LinearCounting(m, V) = m log (m / V)
/// shards_count * ln(shards_count /
/// # calculate V = how many shards contain a 0
/// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
/// )
/// ```
///
/// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
#[derive(Clone)]
pub struct HyperLogLog<const N: usize> {
core: Arc<HyperLogLogCore<N>>,
}
impl<const N: usize> HyperLogLog<N> {
/// Create a [`HyperLogLog`] with the `name` and `help` arguments.
pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> prometheus::Result<Self> {
assert!(N.is_power_of_two());
let opts = Opts::new(name, help);
Self::with_opts(opts)
}
/// Create a [`HyperLogLog`] with the `opts` options.
pub fn with_opts(opts: Opts) -> prometheus::Result<Self> {
Self::with_opts_and_label_values(&opts, &[])
}
fn with_opts_and_label_values(opts: &Opts, label_values: &[&str]) -> prometheus::Result<Self> {
let desc = opts.describe()?;
let labels = make_label_pairs(&desc, label_values)?;
let v = HyperLogLogCore {
shards: [0; N].map(AtomicU8::new),
desc,
labels,
};
Ok(Self { core: Arc::new(v) })
}
pub fn measure(&self, item: &impl Hash) {
// changing the hasher will break compatibility with previous measurements.
self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
}
fn record(&self, hash: u64) {
let p = N.ilog2() as u8;
let j = hash & (N as u64 - 1);
let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
self.core.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
}
}
struct HyperLogLogCore<const N: usize> {
shards: [AtomicU8; N],
desc: core::Desc,
labels: Vec<proto::LabelPair>,
}
impl<const N: usize> core::Collector for HyperLogLog<N> {
fn desc(&self) -> Vec<&core::Desc> {
vec![&self.core.desc]
}
fn collect(&self) -> Vec<proto::MetricFamily> {
let mut m = proto::MetricFamily::default();
m.set_name(self.core.desc.fq_name.clone());
m.set_help(self.core.desc.help.clone());
m.set_field_type(proto::MetricType::GAUGE);
let mut metrics = Vec::new();
self.core.collect_into(&mut metrics);
m.set_metric(metrics);
vec![m]
}
}
impl<const N: usize> HyperLogLogCore<N> {
fn collect_into(&self, metrics: &mut Vec<proto::Metric>) {
self.shards.iter().enumerate().for_each(|(i, x)| {
let mut shard_label = proto::LabelPair::default();
shard_label.set_name("hll_shard".to_owned());
shard_label.set_value(format!("{i}"));
// We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
// This seems like it would be a race condition,
// but HLL is not impacted by a write in one shard happening in between.
// This is because in PromQL we will be implementing a harmonic mean of all buckets.
// we will also merge samples in a time series using `max by (hll_shard)`.
// TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
// this would mean that a dev port-forwarding the metrics url won't break the sampling.
let v = x.swap(0, std::sync::atomic::Ordering::Relaxed);
let mut m = proto::Metric::default();
let mut c = proto::Gauge::default();
c.set_value(v as f64);
m.set_gauge(c);
let mut labels = Vec::with_capacity(self.labels.len() + 1);
labels.extend_from_slice(&self.labels);
labels.push(shard_label);
m.set_label(labels);
metrics.push(m);
})
}
}
fn make_label_pairs(
desc: &core::Desc,
label_values: &[&str],
) -> prometheus::Result<Vec<proto::LabelPair>> {
if desc.variable_labels.len() != label_values.len() {
return Err(prometheus::Error::InconsistentCardinality {
expect: desc.variable_labels.len(),
got: label_values.len(),
});
}
let total_len = desc.variable_labels.len() + desc.const_label_pairs.len();
if total_len == 0 {
return Ok(vec![]);
}
if desc.variable_labels.is_empty() {
return Ok(desc.const_label_pairs.clone());
}
let mut label_pairs = Vec::with_capacity(total_len);
for (i, n) in desc.variable_labels.iter().enumerate() {
let mut label_pair = proto::LabelPair::default();
label_pair.set_name(n.clone());
label_pair.set_value(label_values[i].to_owned());
label_pairs.push(label_pair);
}
for label_pair in &desc.const_label_pairs {
label_pairs.push(label_pair.clone());
}
label_pairs.sort();
Ok(label_pairs)
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use prometheus::{proto, Opts};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand_distr::{Distribution, Zipf};
use crate::HyperLogLogVec;
fn collect(hll: &HyperLogLogVec<32>) -> Vec<proto::Metric> {
let mut metrics = vec![];
hll.core
.children
.read()
.unwrap()
.values()
.for_each(|c| c.core.collect_into(&mut metrics));
metrics
}
fn get_cardinality(metrics: &[proto::Metric], filter: impl Fn(&proto::Metric) -> bool) -> f64 {
let mut buckets = [0.0; 32];
for metric in metrics.chunks_exact(32) {
if filter(&metric[0]) {
for (i, m) in metric.iter().enumerate() {
buckets[i] = f64::max(buckets[i], m.get_gauge().get_value());
}
}
}
buckets
.into_iter()
.map(|f| 2.0f64.powf(-f))
.sum::<f64>()
.recip()
* 0.697
* 32.0
* 32.0
}
fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
let hll = HyperLogLogVec::<32>::new(Opts::new("foo", "bar"), &["x"]).unwrap();
let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
let mut set_a = HashSet::new();
let mut set_b = HashSet::new();
for x in iter.by_ref().take(n) {
set_a.insert(x.to_bits());
hll.with_label_values(&["a"]).measure(&x.to_bits());
}
for x in iter.by_ref().take(n) {
set_b.insert(x.to_bits());
hll.with_label_values(&["b"]).measure(&x.to_bits());
}
let merge = &set_a | &set_b;
let metrics = collect(&hll);
let len = get_cardinality(&metrics, |_| true);
let len_a = get_cardinality(&metrics, |l| l.get_label()[0].get_value() == "a");
let len_b = get_cardinality(&metrics, |l| l.get_label()[0].get_value() == "b");
([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
}
#[test]
fn test_cardinality_small() {
let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
assert_eq!(actual, [46, 30, 32]);
assert!(51.3 < estimate[0] && estimate[0] < 51.4);
assert!(44.0 < estimate[1] && estimate[1] < 44.1);
assert!(39.0 < estimate[2] && estimate[2] < 39.1);
}
#[test]
fn test_cardinality_medium() {
let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
assert_eq!(actual, [2529, 1618, 1629]);
assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
}
#[test]
fn test_cardinality_large() {
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
assert_eq!(actual, [129077, 79579, 79630]);
assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
}
#[test]
fn test_cardinality_small2() {
let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
assert_eq!(actual, [92, 58, 60]);
assert!(116.1 < estimate[0] && estimate[0] < 116.2);
assert!(81.7 < estimate[1] && estimate[1] < 81.8);
assert!(69.3 < estimate[2] && estimate[2] < 69.4);
}
#[test]
fn test_cardinality_medium2() {
let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
assert_eq!(actual, [8201, 5131, 5051]);
assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
}
#[test]
fn test_cardinality_large2() {
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
assert_eq!(actual, [777847, 482069, 482246]);
assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
}
}

View File

@@ -28,7 +28,9 @@ use prometheus::{Registry, Result};
pub mod launch_timestamp;
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
mod hll;
pub mod metric_vec_duration;
pub use hll::{HyperLogLog, HyperLogLogVec};
pub type UIntGauge = GenericGauge<AtomicU64>;
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;

View File

@@ -97,23 +97,86 @@ pub enum EvictionOrder {
/// Order the layers to be evicted by how recently they have been accessed relatively within
/// the set of resident layers of a tenant.
///
/// This strategy will evict layers more fairly but is untested.
RelativeAccessed {
#[serde(default)]
/// Determines if the tenant with most layers should lose first.
///
/// Having this enabled is currently the only reasonable option, because the order in which
/// we read tenants is deterministic. If we find the need to use this as `false`, we need
/// to ensure nondeterminism by adding in a random number to break the
/// `relative_last_activity==0.0` ties.
#[serde(default = "default_highest_layer_count_loses_first")]
highest_layer_count_loses_first: bool,
},
}
fn default_highest_layer_count_loses_first() -> bool {
true
}
impl EvictionOrder {
/// Return true, if with [`Self::RelativeAccessed`] order the tenants with the highest layer
/// counts should be the first ones to have their layers evicted.
fn highest_layer_count_loses_first(&self) -> bool {
fn sort(&self, candidates: &mut [(MinResidentSizePartition, EvictionCandidate)]) {
use EvictionOrder::*;
match self {
EvictionOrder::AbsoluteAccessed => false,
EvictionOrder::RelativeAccessed {
AbsoluteAccessed => {
candidates.sort_unstable_by_key(|(partition, candidate)| {
(*partition, candidate.last_activity_ts)
});
}
RelativeAccessed { .. } => candidates.sort_unstable_by_key(|(partition, candidate)| {
(*partition, candidate.relative_last_activity)
}),
}
}
/// Called to fill in the [`EvictionCandidate::relative_last_activity`] while iterating tenants
/// layers in **most** recently used order.
fn relative_last_activity(&self, total: usize, index: usize) -> finite_f32::FiniteF32 {
use EvictionOrder::*;
match self {
AbsoluteAccessed => finite_f32::FiniteF32::ZERO,
RelativeAccessed {
highest_layer_count_loses_first,
} => *highest_layer_count_loses_first,
} => {
// keeping the -1 or not decides if every tenant should lose their least recently accessed
// layer OR if this should happen in the order of having highest layer count:
let fudge = if *highest_layer_count_loses_first {
// relative_last_activity vs. tenant layer count:
// - 0.1..=1.0 (10 layers)
// - 0.01..=1.0 (100 layers)
// - 0.001..=1.0 (1000 layers)
//
// leading to evicting less of the smallest tenants.
0
} else {
// use full 0.0..=1.0 range, which means even the smallest tenants could always lose a
// layer. the actual ordering is unspecified: for 10k tenants on a pageserver it could
// be that less than 10k layer evictions is enough, so we would not need to evict from
// all tenants.
//
// as the tenant ordering is now deterministic this could hit the same tenants
// disproportionetly on multiple invocations. alternative could be to remember how many
// layers did we evict last time from this tenant, and inject that as an additional
// fudge here.
1
};
let total = total.checked_sub(fudge).filter(|&x| x > 1).unwrap_or(1);
let divider = total as f32;
// most recently used is always (total - 0) / divider == 1.0
// least recently used depends on the fudge:
// - (total - 1) - (total - 1) / total => 0 / total
// - total - (total - 1) / total => 1 / total
let distance = (total - index) as f32;
finite_f32::FiniteF32::try_from_normalized(distance / divider)
.unwrap_or_else(|val| {
tracing::warn!(%fudge, "calculated invalid relative_last_activity for i={index}, total={total}: {val}");
finite_f32::FiniteF32::ZERO
})
}
}
}
}
@@ -389,52 +452,6 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
let selection = select_victims(&candidates, usage_pre);
let mut candidates = candidates;
let selection = if matches!(eviction_order, EvictionOrder::RelativeAccessed { .. }) {
// we currently have the layers ordered by AbsoluteAccessed so that we can get the summary
// for comparison here. this is a temporary measure to develop alternatives.
use std::fmt::Write;
let mut summary_buf = String::with_capacity(256);
{
let absolute_summary = candidates
.iter()
.take(selection.amount)
.map(|(_, candidate)| candidate)
.collect::<summary::EvictionSummary>();
write!(summary_buf, "{absolute_summary}").expect("string grows");
info!("absolute accessed selection summary: {summary_buf}");
}
candidates.sort_unstable_by_key(|(partition, candidate)| {
(*partition, candidate.relative_last_activity)
});
let selection = select_victims(&candidates, usage_pre);
{
summary_buf.clear();
let relative_summary = candidates
.iter()
.take(selection.amount)
.map(|(_, candidate)| candidate)
.collect::<summary::EvictionSummary>();
write!(summary_buf, "{relative_summary}").expect("string grows");
info!("relative accessed selection summary: {summary_buf}");
}
selection
} else {
selection
};
let (evicted_amount, usage_planned) = selection.into_amount_and_planned();
// phase2: evict layers
@@ -835,54 +852,12 @@ async fn collect_eviction_candidates(
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
let mut cumsum: i128 = 0;
// keeping the -1 or not decides if every tenant should lose their least recently accessed
// layer OR if this should happen in the order of having highest layer count:
let fudge = if eviction_order.highest_layer_count_loses_first() {
// relative_age vs. tenant layer count:
// - 0.1..=1.0 (10 layers)
// - 0.01..=1.0 (100 layers)
// - 0.001..=1.0 (1000 layers)
//
// leading to evicting less of the smallest tenants.
0
} else {
// use full 0.0..=1.0 range, which means even the smallest tenants could always lose a
// layer. the actual ordering is unspecified: for 10k tenants on a pageserver it could
// be that less than 10k layer evictions is enough, so we would not need to evict from
// all tenants.
//
// as the tenant ordering is now deterministic this could hit the same tenants
// disproportionetly on multiple invocations. alternative could be to remember how many
// layers did we evict last time from this tenant, and inject that as an additional
// fudge here.
1
};
let total = tenant_candidates
.len()
.checked_sub(fudge)
.filter(|&x| x > 0)
// support 0 or 1 resident layer tenants as well
.unwrap_or(1);
let divider = total as f32;
let total = tenant_candidates.len();
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
// as we iterate this reverse sorted list, the most recently accessed layer will always
// be 1.0; this is for us to evict it last.
candidate.relative_last_activity = if matches!(
eviction_order,
EvictionOrder::RelativeAccessed { .. }
) {
// another possibility: use buckets, like (256.0 * relative_last_activity) as u8 or
// similarly for u16. unsure how it would help.
finite_f32::FiniteF32::try_from_normalized((total - i) as f32 / divider)
.unwrap_or_else(|val| {
tracing::warn!(%fudge, "calculated invalid relative_last_activity for i={i}, total={total}: {val}");
finite_f32::FiniteF32::ZERO
})
} else {
finite_f32::FiniteF32::ZERO
};
candidate.relative_last_activity = eviction_order.relative_last_activity(total, i);
let partition = if cumsum > min_resident_size as i128 {
MinResidentSizePartition::Above
@@ -927,10 +902,7 @@ async fn collect_eviction_candidates(
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
// always behave as if AbsoluteAccessed was selected. if RelativeAccessed is in use, we
// will sort later by candidate.relative_last_activity to get compare evictions.
candidates
.sort_unstable_by_key(|(partition, candidate)| (*partition, candidate.last_activity_ts));
eviction_order.sort(&mut candidates);
Ok(EvictionCandidates::Finished(candidates))
}
@@ -1070,6 +1042,12 @@ pub(crate) mod finite_f32 {
}
}
impl From<FiniteF32> for f32 {
fn from(value: FiniteF32) -> f32 {
value.0
}
}
impl FiniteF32 {
pub const ZERO: FiniteF32 = FiniteF32(0.0);
@@ -1082,136 +1060,9 @@ pub(crate) mod finite_f32 {
Err(value)
}
}
}
}
mod summary {
use super::finite_f32::FiniteF32;
use super::{EvictionCandidate, LayerCount};
use pageserver_api::shard::TenantShardId;
use std::collections::{BTreeMap, HashMap};
use std::time::SystemTime;
#[derive(Debug, Default)]
pub(super) struct EvictionSummary {
evicted_per_tenant: HashMap<TenantShardId, LayerCount>,
total: LayerCount,
last_absolute: Option<SystemTime>,
last_relative: Option<FiniteF32>,
}
impl<'a> FromIterator<&'a EvictionCandidate> for EvictionSummary {
fn from_iter<T: IntoIterator<Item = &'a EvictionCandidate>>(iter: T) -> Self {
let mut summary = EvictionSummary::default();
for item in iter {
let counts = summary
.evicted_per_tenant
.entry(*item.layer.get_tenant_shard_id())
.or_default();
let sz = item.layer.get_file_size();
counts.file_sizes += sz;
counts.count += 1;
summary.total.file_sizes += sz;
summary.total.count += 1;
summary.last_absolute = Some(item.last_activity_ts);
summary.last_relative = Some(item.relative_last_activity);
}
summary
}
}
struct SiBytesAmount(u64);
impl std::fmt::Display for SiBytesAmount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.0 < 1024 {
return write!(f, "{}B", self.0);
}
let mut tmp = self.0;
let mut ch = 0;
let suffixes = b"KMGTPE";
while tmp > 1024 * 1024 && ch < suffixes.len() - 1 {
tmp /= 1024;
ch += 1;
}
let ch = suffixes[ch] as char;
write!(f, "{:.1}{ch}iB", tmp as f64 / 1024.0)
}
}
impl std::fmt::Display for EvictionSummary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// wasteful, but it's for testing
let mut sorted: BTreeMap<usize, Vec<(TenantShardId, u64)>> = BTreeMap::new();
for (tenant_shard_id, count) in &self.evicted_per_tenant {
sorted
.entry(count.count)
.or_default()
.push((*tenant_shard_id, count.file_sizes));
}
let total_file_sizes = SiBytesAmount(self.total.file_sizes);
writeln!(
f,
"selected {} layers of {total_file_sizes} up to ({:?}, {:.2?}):",
self.total.count, self.last_absolute, self.last_relative,
)?;
for (count, per_tenant) in sorted.iter().rev().take(10) {
write!(f, "- {count} layers: ")?;
if per_tenant.len() < 3 {
for (i, (tenant_shard_id, bytes)) in per_tenant.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
let bytes = SiBytesAmount(*bytes);
write!(f, "{tenant_shard_id} ({bytes})")?;
}
} else {
let num_tenants = per_tenant.len();
let total_bytes = per_tenant.iter().map(|(_id, bytes)| bytes).sum::<u64>();
let total_bytes = SiBytesAmount(total_bytes);
let layers = num_tenants * count;
write!(
f,
"{num_tenants} tenants {total_bytes} in total {layers} layers",
)?;
}
writeln!(f)?;
}
if sorted.len() > 10 {
let (rem_count, rem_bytes) = sorted
.iter()
.rev()
.map(|(count, per_tenant)| {
(
count,
per_tenant.iter().map(|(_id, bytes)| bytes).sum::<u64>(),
)
})
.fold((0, 0), |acc, next| (acc.0 + next.0, acc.1 + next.1));
let rem_bytes = SiBytesAmount(rem_bytes);
writeln!(f, "- rest of tenants ({}) not shown ({rem_count} layers or {:.1}%, {rem_bytes} or {:.1}% bytes)", sorted.len() - 10, 100.0 * rem_count as f64 / self.total.count as f64, 100.0 * rem_bytes.0 as f64 / self.total.file_sizes as f64)?;
}
Ok(())
pub fn into_inner(self) -> f32 {
self.into()
}
}
}
@@ -1336,3 +1187,40 @@ mod filesystem_level_usage {
assert!(!usage.has_pressure());
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn relative_equal_bounds() {
let order = EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first: false,
};
let len = 10;
let v = (0..len)
.map(|i| order.relative_last_activity(len, i).into_inner())
.collect::<Vec<_>>();
assert_eq!(v.first(), Some(&1.0));
assert_eq!(v.last(), Some(&0.0));
assert!(v.windows(2).all(|slice| slice[0] > slice[1]));
}
#[test]
fn relative_spare_bounds() {
let order = EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first: true,
};
let len = 10;
let v = (0..len)
.map(|i| order.relative_last_activity(len, i).into_inner())
.collect::<Vec<_>>();
assert_eq!(v.first(), Some(&1.0));
assert_eq!(v.last(), Some(&0.1));
assert!(v.windows(2).all(|slice| slice[0] > slice[1]));
}
}

View File

@@ -68,6 +68,7 @@ use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
@@ -1675,8 +1676,16 @@ impl From<GetActiveTenantError> for QueryError {
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::Cancelled
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
QueryError::Shutdown
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. })
| GetActiveTenantError::NotFound(GetTenantError::MapState(_)) => QueryError::Shutdown,
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)) => {
QueryError::NotFound(format!("Tenant {tenant_id} not attached").into())
}
GetActiveTenantError::NotFound(
GetTenantError::NotActive(_) | GetTenantError::Broken(_),
) => {
// If the tenant is present but not in a state where it can serve I/O, prompt client to backoff/retry by reconnecting
QueryError::Reconnect
}
e => QueryError::Other(anyhow::anyhow!(e)),
}

View File

@@ -91,6 +91,11 @@ impl RequestMonitoring {
pub fn set_endpoint_id(&mut self, endpoint_id: Option<EndpointId>) {
self.endpoint_id = endpoint_id.or_else(|| self.endpoint_id.clone());
if let Some(ep) = &self.endpoint_id {
crate::metrics::CONNECTING_ENDPOINTS
.with_label_values(&[self.protocol])
.measure(&ep);
}
}
pub fn set_application(&mut self, app: Option<SmolStr>) {

View File

@@ -1,10 +1,7 @@
use ::metrics::{
exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec,
IntCounterPairVec, IntCounterVec,
};
use prometheus::{
register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec,
IntGaugeVec,
exponential_buckets, register_histogram, register_histogram_vec, register_hll_vec,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge_vec, Histogram,
HistogramVec, HyperLogLogVec, IntCounterPairVec, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -236,3 +233,13 @@ pub const fn bool_to_str(x: bool) -> &'static str {
"false"
}
}
pub static CONNECTING_ENDPOINTS: Lazy<HyperLogLogVec<32>> = Lazy::new(|| {
register_hll_vec!(
32,
"proxy_connecting_endpoints",
"HLL approximate cardinality of endpoints that are connecting",
&["protocol"],
)
.unwrap()
});

View File

@@ -51,7 +51,7 @@ memchr = { version = "2" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] }
prost = { version = "0.11" }
@@ -100,7 +100,7 @@ memchr = { version = "2" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] }
prost = { version = "0.11" }