mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
Compare commits
1 Commits
jcsp/page-
...
sk-basic-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfd78950e2 |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -508,7 +508,7 @@ jobs:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -2736,12 +2736,6 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libm"
|
||||
version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.1.4"
|
||||
@@ -2838,9 +2832,6 @@ dependencies = [
|
||||
"libc",
|
||||
"once_cell",
|
||||
"prometheus",
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"twox-hash",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -3066,7 +3057,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4181,16 +4171,6 @@ dependencies = [
|
||||
"getrandom 0.2.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_distr"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_hc"
|
||||
version = "0.2.0"
|
||||
|
||||
@@ -165,7 +165,6 @@ tracing = "0.1"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-opentelemetry = "0.20.0"
|
||||
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
url = "2.2"
|
||||
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
|
||||
@@ -9,10 +9,5 @@ prometheus.workspace = true
|
||||
libc.workspace = true
|
||||
once_cell.workspace = true
|
||||
chrono.workspace = true
|
||||
twox-hash.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.8"
|
||||
rand_distr = "0.4.3"
|
||||
|
||||
@@ -1,523 +0,0 @@
|
||||
//! HyperLogLog is an algorithm for the count-distinct problem,
|
||||
//! approximating the number of distinct elements in a multiset.
|
||||
//! Calculating the exact cardinality of the distinct elements
|
||||
//! of a multiset requires an amount of memory proportional to
|
||||
//! the cardinality, which is impractical for very large data sets.
|
||||
//! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
|
||||
//! use significantly less memory than this, but can only approximate the cardinality.
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
hash::{BuildHasher, BuildHasherDefault, Hash, Hasher},
|
||||
sync::{atomic::AtomicU8, Arc, RwLock},
|
||||
};
|
||||
|
||||
use prometheus::{
|
||||
core::{self, Describer},
|
||||
proto, Opts,
|
||||
};
|
||||
use twox_hash::xxh3;
|
||||
|
||||
/// Create an [`HyperLogLogVec`] and registers to default registry.
|
||||
#[macro_export(local_inner_macros)]
|
||||
macro_rules! register_hll_vec {
|
||||
($N:literal, $OPTS:expr, $LABELS_NAMES:expr $(,)?) => {{
|
||||
let hll_vec = $crate::HyperLogLogVec::<$N>::new($OPTS, $LABELS_NAMES).unwrap();
|
||||
$crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
|
||||
}};
|
||||
|
||||
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
|
||||
$crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
|
||||
}};
|
||||
}
|
||||
|
||||
/// Create an [`HyperLogLog`] and registers to default registry.
|
||||
#[macro_export(local_inner_macros)]
|
||||
macro_rules! register_hll {
|
||||
($N:literal, $OPTS:expr $(,)?) => {{
|
||||
let hll = $crate::HyperLogLog::<$N>::with_opts($OPTS).unwrap();
|
||||
$crate::register(Box::new(hll.clone())).map(|_| hll)
|
||||
}};
|
||||
|
||||
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
|
||||
$crate::register_hll!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
|
||||
}};
|
||||
}
|
||||
|
||||
/// HLL is a probabilistic cardinality measure.
|
||||
///
|
||||
/// How to use this time-series for a metric name `my_metrics_total_hll`:
|
||||
///
|
||||
/// ```promql
|
||||
/// # harmonic mean
|
||||
/// 1 / (
|
||||
/// sum (
|
||||
/// 2 ^ -(
|
||||
/// # HLL merge operation
|
||||
/// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
|
||||
/// )
|
||||
/// ) without (hll_shard)
|
||||
/// )
|
||||
/// * alpha
|
||||
/// * shards_count
|
||||
/// * shards_count
|
||||
/// ```
|
||||
///
|
||||
/// If you want an estimate over time, you can use the following query:
|
||||
///
|
||||
/// ```promql
|
||||
/// # harmonic mean
|
||||
/// 1 / (
|
||||
/// sum (
|
||||
/// 2 ^ -(
|
||||
/// # HLL merge operation
|
||||
/// max (
|
||||
/// max_over_time(my_metrics_total_hll{}[$__rate_interval])
|
||||
/// ) by (hll_shard, other_labels...)
|
||||
/// )
|
||||
/// ) without (hll_shard)
|
||||
/// )
|
||||
/// * alpha
|
||||
/// * shards_count
|
||||
/// * shards_count
|
||||
/// ```
|
||||
///
|
||||
/// In the case of low cardinality, you might want to use the linear counting approximation:
|
||||
///
|
||||
/// ```promql
|
||||
/// # LinearCounting(m, V) = m log (m / V)
|
||||
/// shards_count * ln(shards_count /
|
||||
/// # calculate V = how many shards contain a 0
|
||||
/// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
|
||||
/// )
|
||||
/// ```
|
||||
///
|
||||
/// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
|
||||
#[derive(Clone)]
|
||||
pub struct HyperLogLogVec<const N: usize> {
|
||||
core: Arc<HyperLogLogVecCore<N>>,
|
||||
}
|
||||
|
||||
struct HyperLogLogVecCore<const N: usize> {
|
||||
pub children: RwLock<HashMap<u64, HyperLogLog<N>, BuildHasherDefault<xxh3::Hash64>>>,
|
||||
pub desc: core::Desc,
|
||||
pub opts: Opts,
|
||||
}
|
||||
|
||||
impl<const N: usize> core::Collector for HyperLogLogVec<N> {
|
||||
fn desc(&self) -> Vec<&core::Desc> {
|
||||
vec![&self.core.desc]
|
||||
}
|
||||
|
||||
fn collect(&self) -> Vec<proto::MetricFamily> {
|
||||
let mut m = proto::MetricFamily::default();
|
||||
m.set_name(self.core.desc.fq_name.clone());
|
||||
m.set_help(self.core.desc.help.clone());
|
||||
m.set_field_type(proto::MetricType::GAUGE);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
for child in self.core.children.read().unwrap().values() {
|
||||
child.core.collect_into(&mut metrics);
|
||||
}
|
||||
m.set_metric(metrics);
|
||||
|
||||
vec![m]
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> HyperLogLogVec<N> {
|
||||
/// Create a new [`HyperLogLogVec`] based on the provided
|
||||
/// [`Opts`] and partitioned by the given label names. At least one label name must be
|
||||
/// provided.
|
||||
pub fn new(opts: Opts, label_names: &[&str]) -> prometheus::Result<Self> {
|
||||
assert!(N.is_power_of_two());
|
||||
let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
|
||||
let opts = opts.variable_labels(variable_names);
|
||||
|
||||
let desc = opts.describe()?;
|
||||
let v = HyperLogLogVecCore {
|
||||
children: RwLock::new(HashMap::default()),
|
||||
desc,
|
||||
opts,
|
||||
};
|
||||
|
||||
Ok(Self { core: Arc::new(v) })
|
||||
}
|
||||
|
||||
/// `get_metric_with_label_values` returns the [`HyperLogLog<P>`] for the given slice
|
||||
/// of label values (same order as the VariableLabels in Desc). If that combination of
|
||||
/// label values is accessed for the first time, a new [`HyperLogLog<P>`] is created.
|
||||
///
|
||||
/// An error is returned if the number of label values is not the same as the
|
||||
/// number of VariableLabels in Desc.
|
||||
pub fn get_metric_with_label_values(
|
||||
&self,
|
||||
vals: &[&str],
|
||||
) -> prometheus::Result<HyperLogLog<N>> {
|
||||
self.core.get_metric_with_label_values(vals)
|
||||
}
|
||||
|
||||
/// `with_label_values` works as `get_metric_with_label_values`, but panics if an error
|
||||
/// occurs.
|
||||
pub fn with_label_values(&self, vals: &[&str]) -> HyperLogLog<N> {
|
||||
self.get_metric_with_label_values(vals).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> HyperLogLogVecCore<N> {
|
||||
pub fn get_metric_with_label_values(
|
||||
&self,
|
||||
vals: &[&str],
|
||||
) -> prometheus::Result<HyperLogLog<N>> {
|
||||
let h = self.hash_label_values(vals)?;
|
||||
|
||||
if let Some(metric) = self.children.read().unwrap().get(&h).cloned() {
|
||||
return Ok(metric);
|
||||
}
|
||||
|
||||
self.get_or_create_metric(h, vals)
|
||||
}
|
||||
|
||||
pub(crate) fn hash_label_values(&self, vals: &[&str]) -> prometheus::Result<u64> {
|
||||
if vals.len() != self.desc.variable_labels.len() {
|
||||
return Err(prometheus::Error::InconsistentCardinality {
|
||||
expect: self.desc.variable_labels.len(),
|
||||
got: vals.len(),
|
||||
});
|
||||
}
|
||||
|
||||
let mut h = xxh3::Hash64::default();
|
||||
for val in vals {
|
||||
h.write(val.as_bytes());
|
||||
}
|
||||
|
||||
Ok(h.finish())
|
||||
}
|
||||
|
||||
fn get_or_create_metric(
|
||||
&self,
|
||||
hash: u64,
|
||||
label_values: &[&str],
|
||||
) -> prometheus::Result<HyperLogLog<N>> {
|
||||
let mut children = self.children.write().unwrap();
|
||||
// Check exist first.
|
||||
if let Some(metric) = children.get(&hash).cloned() {
|
||||
return Ok(metric);
|
||||
}
|
||||
|
||||
let metric = HyperLogLog::with_opts_and_label_values(&self.opts, label_values)?;
|
||||
children.insert(hash, metric.clone());
|
||||
Ok(metric)
|
||||
}
|
||||
}
|
||||
|
||||
/// HLL is a probabilistic cardinality measure.
|
||||
///
|
||||
/// How to use this time-series for a metric name `my_metrics_total_hll`:
|
||||
///
|
||||
/// ```promql
|
||||
/// # harmonic mean
|
||||
/// 1 / (
|
||||
/// sum (
|
||||
/// 2 ^ -(
|
||||
/// # HLL merge operation
|
||||
/// max (my_metrics_total_hll{}) by (hll_shard, other_labels...)
|
||||
/// )
|
||||
/// ) without (hll_shard)
|
||||
/// )
|
||||
/// * alpha
|
||||
/// * shards_count
|
||||
/// * shards_count
|
||||
/// ```
|
||||
///
|
||||
/// If you want an estimate over time, you can use the following query:
|
||||
///
|
||||
/// ```promql
|
||||
/// # harmonic mean
|
||||
/// 1 / (
|
||||
/// sum (
|
||||
/// 2 ^ -(
|
||||
/// # HLL merge operation
|
||||
/// max (
|
||||
/// max_over_time(my_metrics_total_hll{}[$__rate_interval])
|
||||
/// ) by (hll_shard, other_labels...)
|
||||
/// )
|
||||
/// ) without (hll_shard)
|
||||
/// )
|
||||
/// * alpha
|
||||
/// * shards_count
|
||||
/// * shards_count
|
||||
/// ```
|
||||
///
|
||||
/// In the case of low cardinality, you might want to use the linear counting approximation:
|
||||
///
|
||||
/// ```promql
|
||||
/// # LinearCounting(m, V) = m log (m / V)
|
||||
/// shards_count * ln(shards_count /
|
||||
/// # calculate V = how many shards contain a 0
|
||||
/// count(max (proxy_connecting_endpoints{}) by (hll_shard, protocol) == 0) without (hll_shard)
|
||||
/// )
|
||||
/// ```
|
||||
///
|
||||
/// See <https://en.wikipedia.org/wiki/HyperLogLog#Practical_considerations> for estimates on alpha
|
||||
#[derive(Clone)]
|
||||
pub struct HyperLogLog<const N: usize> {
|
||||
core: Arc<HyperLogLogCore<N>>,
|
||||
}
|
||||
|
||||
impl<const N: usize> HyperLogLog<N> {
|
||||
/// Create a [`HyperLogLog`] with the `name` and `help` arguments.
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> prometheus::Result<Self> {
|
||||
assert!(N.is_power_of_two());
|
||||
let opts = Opts::new(name, help);
|
||||
Self::with_opts(opts)
|
||||
}
|
||||
|
||||
/// Create a [`HyperLogLog`] with the `opts` options.
|
||||
pub fn with_opts(opts: Opts) -> prometheus::Result<Self> {
|
||||
Self::with_opts_and_label_values(&opts, &[])
|
||||
}
|
||||
|
||||
fn with_opts_and_label_values(opts: &Opts, label_values: &[&str]) -> prometheus::Result<Self> {
|
||||
let desc = opts.describe()?;
|
||||
let labels = make_label_pairs(&desc, label_values)?;
|
||||
|
||||
let v = HyperLogLogCore {
|
||||
shards: [0; N].map(AtomicU8::new),
|
||||
desc,
|
||||
labels,
|
||||
};
|
||||
Ok(Self { core: Arc::new(v) })
|
||||
}
|
||||
|
||||
pub fn measure(&self, item: &impl Hash) {
|
||||
// changing the hasher will break compatibility with previous measurements.
|
||||
self.record(BuildHasherDefault::<xxh3::Hash64>::default().hash_one(item));
|
||||
}
|
||||
|
||||
fn record(&self, hash: u64) {
|
||||
let p = N.ilog2() as u8;
|
||||
let j = hash & (N as u64 - 1);
|
||||
let rho = (hash >> p).leading_zeros() as u8 + 1 - p;
|
||||
self.core.shards[j as usize].fetch_max(rho, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
struct HyperLogLogCore<const N: usize> {
|
||||
shards: [AtomicU8; N],
|
||||
desc: core::Desc,
|
||||
labels: Vec<proto::LabelPair>,
|
||||
}
|
||||
|
||||
impl<const N: usize> core::Collector for HyperLogLog<N> {
|
||||
fn desc(&self) -> Vec<&core::Desc> {
|
||||
vec![&self.core.desc]
|
||||
}
|
||||
|
||||
fn collect(&self) -> Vec<proto::MetricFamily> {
|
||||
let mut m = proto::MetricFamily::default();
|
||||
m.set_name(self.core.desc.fq_name.clone());
|
||||
m.set_help(self.core.desc.help.clone());
|
||||
m.set_field_type(proto::MetricType::GAUGE);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
self.core.collect_into(&mut metrics);
|
||||
m.set_metric(metrics);
|
||||
|
||||
vec![m]
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> HyperLogLogCore<N> {
|
||||
fn collect_into(&self, metrics: &mut Vec<proto::Metric>) {
|
||||
self.shards.iter().enumerate().for_each(|(i, x)| {
|
||||
let mut shard_label = proto::LabelPair::default();
|
||||
shard_label.set_name("hll_shard".to_owned());
|
||||
shard_label.set_value(format!("{i}"));
|
||||
|
||||
// We reset the counter to 0 so we can perform a cardinality measure over any time slice in prometheus.
|
||||
|
||||
// This seems like it would be a race condition,
|
||||
// but HLL is not impacted by a write in one shard happening in between.
|
||||
// This is because in PromQL we will be implementing a harmonic mean of all buckets.
|
||||
// we will also merge samples in a time series using `max by (hll_shard)`.
|
||||
|
||||
// TODO: maybe we shouldn't reset this on every collect, instead, only after a time window.
|
||||
// this would mean that a dev port-forwarding the metrics url won't break the sampling.
|
||||
let v = x.swap(0, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let mut m = proto::Metric::default();
|
||||
let mut c = proto::Gauge::default();
|
||||
c.set_value(v as f64);
|
||||
m.set_gauge(c);
|
||||
|
||||
let mut labels = Vec::with_capacity(self.labels.len() + 1);
|
||||
labels.extend_from_slice(&self.labels);
|
||||
labels.push(shard_label);
|
||||
|
||||
m.set_label(labels);
|
||||
metrics.push(m);
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn make_label_pairs(
|
||||
desc: &core::Desc,
|
||||
label_values: &[&str],
|
||||
) -> prometheus::Result<Vec<proto::LabelPair>> {
|
||||
if desc.variable_labels.len() != label_values.len() {
|
||||
return Err(prometheus::Error::InconsistentCardinality {
|
||||
expect: desc.variable_labels.len(),
|
||||
got: label_values.len(),
|
||||
});
|
||||
}
|
||||
|
||||
let total_len = desc.variable_labels.len() + desc.const_label_pairs.len();
|
||||
if total_len == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
if desc.variable_labels.is_empty() {
|
||||
return Ok(desc.const_label_pairs.clone());
|
||||
}
|
||||
|
||||
let mut label_pairs = Vec::with_capacity(total_len);
|
||||
for (i, n) in desc.variable_labels.iter().enumerate() {
|
||||
let mut label_pair = proto::LabelPair::default();
|
||||
label_pair.set_name(n.clone());
|
||||
label_pair.set_value(label_values[i].to_owned());
|
||||
label_pairs.push(label_pair);
|
||||
}
|
||||
|
||||
for label_pair in &desc.const_label_pairs {
|
||||
label_pairs.push(label_pair.clone());
|
||||
}
|
||||
label_pairs.sort();
|
||||
Ok(label_pairs)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use prometheus::{proto, Opts};
|
||||
use rand::{rngs::StdRng, Rng, SeedableRng};
|
||||
use rand_distr::{Distribution, Zipf};
|
||||
|
||||
use crate::HyperLogLogVec;
|
||||
|
||||
fn collect(hll: &HyperLogLogVec<32>) -> Vec<proto::Metric> {
|
||||
let mut metrics = vec![];
|
||||
hll.core
|
||||
.children
|
||||
.read()
|
||||
.unwrap()
|
||||
.values()
|
||||
.for_each(|c| c.core.collect_into(&mut metrics));
|
||||
metrics
|
||||
}
|
||||
fn get_cardinality(metrics: &[proto::Metric], filter: impl Fn(&proto::Metric) -> bool) -> f64 {
|
||||
let mut buckets = [0.0; 32];
|
||||
for metric in metrics.chunks_exact(32) {
|
||||
if filter(&metric[0]) {
|
||||
for (i, m) in metric.iter().enumerate() {
|
||||
buckets[i] = f64::max(buckets[i], m.get_gauge().get_value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buckets
|
||||
.into_iter()
|
||||
.map(|f| 2.0f64.powf(-f))
|
||||
.sum::<f64>()
|
||||
.recip()
|
||||
* 0.697
|
||||
* 32.0
|
||||
* 32.0
|
||||
}
|
||||
|
||||
fn test_cardinality(n: usize, dist: impl Distribution<f64>) -> ([usize; 3], [f64; 3]) {
|
||||
let hll = HyperLogLogVec::<32>::new(Opts::new("foo", "bar"), &["x"]).unwrap();
|
||||
|
||||
let mut iter = StdRng::seed_from_u64(0x2024_0112).sample_iter(dist);
|
||||
let mut set_a = HashSet::new();
|
||||
let mut set_b = HashSet::new();
|
||||
|
||||
for x in iter.by_ref().take(n) {
|
||||
set_a.insert(x.to_bits());
|
||||
hll.with_label_values(&["a"]).measure(&x.to_bits());
|
||||
}
|
||||
for x in iter.by_ref().take(n) {
|
||||
set_b.insert(x.to_bits());
|
||||
hll.with_label_values(&["b"]).measure(&x.to_bits());
|
||||
}
|
||||
let merge = &set_a | &set_b;
|
||||
|
||||
let metrics = collect(&hll);
|
||||
let len = get_cardinality(&metrics, |_| true);
|
||||
let len_a = get_cardinality(&metrics, |l| l.get_label()[0].get_value() == "a");
|
||||
let len_b = get_cardinality(&metrics, |l| l.get_label()[0].get_value() == "b");
|
||||
|
||||
([merge.len(), set_a.len(), set_b.len()], [len, len_a, len_b])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_small() {
|
||||
let (actual, estimate) = test_cardinality(100, Zipf::new(100, 1.2f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [46, 30, 32]);
|
||||
assert!(51.3 < estimate[0] && estimate[0] < 51.4);
|
||||
assert!(44.0 < estimate[1] && estimate[1] < 44.1);
|
||||
assert!(39.0 < estimate[2] && estimate[2] < 39.1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_medium() {
|
||||
let (actual, estimate) = test_cardinality(10000, Zipf::new(10000, 1.2f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [2529, 1618, 1629]);
|
||||
assert!(2309.1 < estimate[0] && estimate[0] < 2309.2);
|
||||
assert!(1566.6 < estimate[1] && estimate[1] < 1566.7);
|
||||
assert!(1629.5 < estimate[2] && estimate[2] < 1629.6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_large() {
|
||||
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(1_000_000, 1.2f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [129077, 79579, 79630]);
|
||||
assert!(126067.2 < estimate[0] && estimate[0] < 126067.3);
|
||||
assert!(83076.8 < estimate[1] && estimate[1] < 83076.9);
|
||||
assert!(64251.2 < estimate[2] && estimate[2] < 64251.3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_small2() {
|
||||
let (actual, estimate) = test_cardinality(100, Zipf::new(200, 0.8f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [92, 58, 60]);
|
||||
assert!(116.1 < estimate[0] && estimate[0] < 116.2);
|
||||
assert!(81.7 < estimate[1] && estimate[1] < 81.8);
|
||||
assert!(69.3 < estimate[2] && estimate[2] < 69.4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_medium2() {
|
||||
let (actual, estimate) = test_cardinality(10000, Zipf::new(20000, 0.8f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [8201, 5131, 5051]);
|
||||
assert!(6846.4 < estimate[0] && estimate[0] < 6846.5);
|
||||
assert!(5239.1 < estimate[1] && estimate[1] < 5239.2);
|
||||
assert!(4292.8 < estimate[2] && estimate[2] < 4292.9);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_large2() {
|
||||
let (actual, estimate) = test_cardinality(1_000_000, Zipf::new(2_000_000, 0.8f64).unwrap());
|
||||
|
||||
assert_eq!(actual, [777847, 482069, 482246]);
|
||||
assert!(699437.4 < estimate[0] && estimate[0] < 699437.5);
|
||||
assert!(374948.9 < estimate[1] && estimate[1] < 374949.0);
|
||||
assert!(434609.7 < estimate[2] && estimate[2] < 434609.8);
|
||||
}
|
||||
}
|
||||
@@ -28,9 +28,7 @@ use prometheus::{Registry, Result};
|
||||
pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
mod hll;
|
||||
pub mod metric_vec_duration;
|
||||
pub use hll::{HyperLogLog, HyperLogLogVec};
|
||||
|
||||
pub type UIntGauge = GenericGauge<AtomicU64>;
|
||||
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
|
||||
|
||||
@@ -97,86 +97,23 @@ pub enum EvictionOrder {
|
||||
|
||||
/// Order the layers to be evicted by how recently they have been accessed relatively within
|
||||
/// the set of resident layers of a tenant.
|
||||
///
|
||||
/// This strategy will evict layers more fairly but is untested.
|
||||
RelativeAccessed {
|
||||
/// Determines if the tenant with most layers should lose first.
|
||||
///
|
||||
/// Having this enabled is currently the only reasonable option, because the order in which
|
||||
/// we read tenants is deterministic. If we find the need to use this as `false`, we need
|
||||
/// to ensure nondeterminism by adding in a random number to break the
|
||||
/// `relative_last_activity==0.0` ties.
|
||||
#[serde(default = "default_highest_layer_count_loses_first")]
|
||||
#[serde(default)]
|
||||
highest_layer_count_loses_first: bool,
|
||||
},
|
||||
}
|
||||
|
||||
fn default_highest_layer_count_loses_first() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl EvictionOrder {
|
||||
fn sort(&self, candidates: &mut [(MinResidentSizePartition, EvictionCandidate)]) {
|
||||
use EvictionOrder::*;
|
||||
|
||||
/// Return true, if with [`Self::RelativeAccessed`] order the tenants with the highest layer
|
||||
/// counts should be the first ones to have their layers evicted.
|
||||
fn highest_layer_count_loses_first(&self) -> bool {
|
||||
match self {
|
||||
AbsoluteAccessed => {
|
||||
candidates.sort_unstable_by_key(|(partition, candidate)| {
|
||||
(*partition, candidate.last_activity_ts)
|
||||
});
|
||||
}
|
||||
RelativeAccessed { .. } => candidates.sort_unstable_by_key(|(partition, candidate)| {
|
||||
(*partition, candidate.relative_last_activity)
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Called to fill in the [`EvictionCandidate::relative_last_activity`] while iterating tenants
|
||||
/// layers in **most** recently used order.
|
||||
fn relative_last_activity(&self, total: usize, index: usize) -> finite_f32::FiniteF32 {
|
||||
use EvictionOrder::*;
|
||||
|
||||
match self {
|
||||
AbsoluteAccessed => finite_f32::FiniteF32::ZERO,
|
||||
RelativeAccessed {
|
||||
EvictionOrder::AbsoluteAccessed => false,
|
||||
EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first,
|
||||
} => {
|
||||
// keeping the -1 or not decides if every tenant should lose their least recently accessed
|
||||
// layer OR if this should happen in the order of having highest layer count:
|
||||
let fudge = if *highest_layer_count_loses_first {
|
||||
// relative_last_activity vs. tenant layer count:
|
||||
// - 0.1..=1.0 (10 layers)
|
||||
// - 0.01..=1.0 (100 layers)
|
||||
// - 0.001..=1.0 (1000 layers)
|
||||
//
|
||||
// leading to evicting less of the smallest tenants.
|
||||
0
|
||||
} else {
|
||||
// use full 0.0..=1.0 range, which means even the smallest tenants could always lose a
|
||||
// layer. the actual ordering is unspecified: for 10k tenants on a pageserver it could
|
||||
// be that less than 10k layer evictions is enough, so we would not need to evict from
|
||||
// all tenants.
|
||||
//
|
||||
// as the tenant ordering is now deterministic this could hit the same tenants
|
||||
// disproportionetly on multiple invocations. alternative could be to remember how many
|
||||
// layers did we evict last time from this tenant, and inject that as an additional
|
||||
// fudge here.
|
||||
1
|
||||
};
|
||||
|
||||
let total = total.checked_sub(fudge).filter(|&x| x > 1).unwrap_or(1);
|
||||
let divider = total as f32;
|
||||
|
||||
// most recently used is always (total - 0) / divider == 1.0
|
||||
// least recently used depends on the fudge:
|
||||
// - (total - 1) - (total - 1) / total => 0 / total
|
||||
// - total - (total - 1) / total => 1 / total
|
||||
let distance = (total - index) as f32;
|
||||
|
||||
finite_f32::FiniteF32::try_from_normalized(distance / divider)
|
||||
.unwrap_or_else(|val| {
|
||||
tracing::warn!(%fudge, "calculated invalid relative_last_activity for i={index}, total={total}: {val}");
|
||||
finite_f32::FiniteF32::ZERO
|
||||
})
|
||||
}
|
||||
} => *highest_layer_count_loses_first,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -452,6 +389,52 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
|
||||
let selection = select_victims(&candidates, usage_pre);
|
||||
|
||||
let mut candidates = candidates;
|
||||
|
||||
let selection = if matches!(eviction_order, EvictionOrder::RelativeAccessed { .. }) {
|
||||
// we currently have the layers ordered by AbsoluteAccessed so that we can get the summary
|
||||
// for comparison here. this is a temporary measure to develop alternatives.
|
||||
use std::fmt::Write;
|
||||
|
||||
let mut summary_buf = String::with_capacity(256);
|
||||
|
||||
{
|
||||
let absolute_summary = candidates
|
||||
.iter()
|
||||
.take(selection.amount)
|
||||
.map(|(_, candidate)| candidate)
|
||||
.collect::<summary::EvictionSummary>();
|
||||
|
||||
write!(summary_buf, "{absolute_summary}").expect("string grows");
|
||||
|
||||
info!("absolute accessed selection summary: {summary_buf}");
|
||||
}
|
||||
|
||||
candidates.sort_unstable_by_key(|(partition, candidate)| {
|
||||
(*partition, candidate.relative_last_activity)
|
||||
});
|
||||
|
||||
let selection = select_victims(&candidates, usage_pre);
|
||||
|
||||
{
|
||||
summary_buf.clear();
|
||||
|
||||
let relative_summary = candidates
|
||||
.iter()
|
||||
.take(selection.amount)
|
||||
.map(|(_, candidate)| candidate)
|
||||
.collect::<summary::EvictionSummary>();
|
||||
|
||||
write!(summary_buf, "{relative_summary}").expect("string grows");
|
||||
|
||||
info!("relative accessed selection summary: {summary_buf}");
|
||||
}
|
||||
|
||||
selection
|
||||
} else {
|
||||
selection
|
||||
};
|
||||
|
||||
let (evicted_amount, usage_planned) = selection.into_amount_and_planned();
|
||||
|
||||
// phase2: evict layers
|
||||
@@ -852,12 +835,54 @@ async fn collect_eviction_candidates(
|
||||
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
|
||||
let mut cumsum: i128 = 0;
|
||||
|
||||
let total = tenant_candidates.len();
|
||||
// keeping the -1 or not decides if every tenant should lose their least recently accessed
|
||||
// layer OR if this should happen in the order of having highest layer count:
|
||||
let fudge = if eviction_order.highest_layer_count_loses_first() {
|
||||
// relative_age vs. tenant layer count:
|
||||
// - 0.1..=1.0 (10 layers)
|
||||
// - 0.01..=1.0 (100 layers)
|
||||
// - 0.001..=1.0 (1000 layers)
|
||||
//
|
||||
// leading to evicting less of the smallest tenants.
|
||||
0
|
||||
} else {
|
||||
// use full 0.0..=1.0 range, which means even the smallest tenants could always lose a
|
||||
// layer. the actual ordering is unspecified: for 10k tenants on a pageserver it could
|
||||
// be that less than 10k layer evictions is enough, so we would not need to evict from
|
||||
// all tenants.
|
||||
//
|
||||
// as the tenant ordering is now deterministic this could hit the same tenants
|
||||
// disproportionetly on multiple invocations. alternative could be to remember how many
|
||||
// layers did we evict last time from this tenant, and inject that as an additional
|
||||
// fudge here.
|
||||
1
|
||||
};
|
||||
|
||||
let total = tenant_candidates
|
||||
.len()
|
||||
.checked_sub(fudge)
|
||||
.filter(|&x| x > 0)
|
||||
// support 0 or 1 resident layer tenants as well
|
||||
.unwrap_or(1);
|
||||
let divider = total as f32;
|
||||
|
||||
for (i, mut candidate) in tenant_candidates.into_iter().enumerate() {
|
||||
// as we iterate this reverse sorted list, the most recently accessed layer will always
|
||||
// be 1.0; this is for us to evict it last.
|
||||
candidate.relative_last_activity = eviction_order.relative_last_activity(total, i);
|
||||
candidate.relative_last_activity = if matches!(
|
||||
eviction_order,
|
||||
EvictionOrder::RelativeAccessed { .. }
|
||||
) {
|
||||
// another possibility: use buckets, like (256.0 * relative_last_activity) as u8 or
|
||||
// similarly for u16. unsure how it would help.
|
||||
finite_f32::FiniteF32::try_from_normalized((total - i) as f32 / divider)
|
||||
.unwrap_or_else(|val| {
|
||||
tracing::warn!(%fudge, "calculated invalid relative_last_activity for i={i}, total={total}: {val}");
|
||||
finite_f32::FiniteF32::ZERO
|
||||
})
|
||||
} else {
|
||||
finite_f32::FiniteF32::ZERO
|
||||
};
|
||||
|
||||
let partition = if cumsum > min_resident_size as i128 {
|
||||
MinResidentSizePartition::Above
|
||||
@@ -902,7 +927,10 @@ async fn collect_eviction_candidates(
|
||||
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
|
||||
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
|
||||
|
||||
eviction_order.sort(&mut candidates);
|
||||
// always behave as if AbsoluteAccessed was selected. if RelativeAccessed is in use, we
|
||||
// will sort later by candidate.relative_last_activity to get compare evictions.
|
||||
candidates
|
||||
.sort_unstable_by_key(|(partition, candidate)| (*partition, candidate.last_activity_ts));
|
||||
|
||||
Ok(EvictionCandidates::Finished(candidates))
|
||||
}
|
||||
@@ -1042,12 +1070,6 @@ pub(crate) mod finite_f32 {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FiniteF32> for f32 {
|
||||
fn from(value: FiniteF32) -> f32 {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
|
||||
impl FiniteF32 {
|
||||
pub const ZERO: FiniteF32 = FiniteF32(0.0);
|
||||
|
||||
@@ -1060,9 +1082,136 @@ pub(crate) mod finite_f32 {
|
||||
Err(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> f32 {
|
||||
self.into()
|
||||
mod summary {
|
||||
use super::finite_f32::FiniteF32;
|
||||
use super::{EvictionCandidate, LayerCount};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::time::SystemTime;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(super) struct EvictionSummary {
|
||||
evicted_per_tenant: HashMap<TenantShardId, LayerCount>,
|
||||
total: LayerCount,
|
||||
|
||||
last_absolute: Option<SystemTime>,
|
||||
last_relative: Option<FiniteF32>,
|
||||
}
|
||||
|
||||
impl<'a> FromIterator<&'a EvictionCandidate> for EvictionSummary {
|
||||
fn from_iter<T: IntoIterator<Item = &'a EvictionCandidate>>(iter: T) -> Self {
|
||||
let mut summary = EvictionSummary::default();
|
||||
for item in iter {
|
||||
let counts = summary
|
||||
.evicted_per_tenant
|
||||
.entry(*item.layer.get_tenant_shard_id())
|
||||
.or_default();
|
||||
|
||||
let sz = item.layer.get_file_size();
|
||||
|
||||
counts.file_sizes += sz;
|
||||
counts.count += 1;
|
||||
|
||||
summary.total.file_sizes += sz;
|
||||
summary.total.count += 1;
|
||||
|
||||
summary.last_absolute = Some(item.last_activity_ts);
|
||||
summary.last_relative = Some(item.relative_last_activity);
|
||||
}
|
||||
|
||||
summary
|
||||
}
|
||||
}
|
||||
|
||||
struct SiBytesAmount(u64);
|
||||
|
||||
impl std::fmt::Display for SiBytesAmount {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if self.0 < 1024 {
|
||||
return write!(f, "{}B", self.0);
|
||||
}
|
||||
|
||||
let mut tmp = self.0;
|
||||
let mut ch = 0;
|
||||
let suffixes = b"KMGTPE";
|
||||
|
||||
while tmp > 1024 * 1024 && ch < suffixes.len() - 1 {
|
||||
tmp /= 1024;
|
||||
ch += 1;
|
||||
}
|
||||
|
||||
let ch = suffixes[ch] as char;
|
||||
|
||||
write!(f, "{:.1}{ch}iB", tmp as f64 / 1024.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EvictionSummary {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// wasteful, but it's for testing
|
||||
|
||||
let mut sorted: BTreeMap<usize, Vec<(TenantShardId, u64)>> = BTreeMap::new();
|
||||
|
||||
for (tenant_shard_id, count) in &self.evicted_per_tenant {
|
||||
sorted
|
||||
.entry(count.count)
|
||||
.or_default()
|
||||
.push((*tenant_shard_id, count.file_sizes));
|
||||
}
|
||||
|
||||
let total_file_sizes = SiBytesAmount(self.total.file_sizes);
|
||||
|
||||
writeln!(
|
||||
f,
|
||||
"selected {} layers of {total_file_sizes} up to ({:?}, {:.2?}):",
|
||||
self.total.count, self.last_absolute, self.last_relative,
|
||||
)?;
|
||||
|
||||
for (count, per_tenant) in sorted.iter().rev().take(10) {
|
||||
write!(f, "- {count} layers: ")?;
|
||||
|
||||
if per_tenant.len() < 3 {
|
||||
for (i, (tenant_shard_id, bytes)) in per_tenant.iter().enumerate() {
|
||||
if i > 0 {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
let bytes = SiBytesAmount(*bytes);
|
||||
write!(f, "{tenant_shard_id} ({bytes})")?;
|
||||
}
|
||||
} else {
|
||||
let num_tenants = per_tenant.len();
|
||||
let total_bytes = per_tenant.iter().map(|(_id, bytes)| bytes).sum::<u64>();
|
||||
let total_bytes = SiBytesAmount(total_bytes);
|
||||
let layers = num_tenants * count;
|
||||
|
||||
write!(
|
||||
f,
|
||||
"{num_tenants} tenants {total_bytes} in total {layers} layers",
|
||||
)?;
|
||||
}
|
||||
|
||||
writeln!(f)?;
|
||||
}
|
||||
|
||||
if sorted.len() > 10 {
|
||||
let (rem_count, rem_bytes) = sorted
|
||||
.iter()
|
||||
.rev()
|
||||
.map(|(count, per_tenant)| {
|
||||
(
|
||||
count,
|
||||
per_tenant.iter().map(|(_id, bytes)| bytes).sum::<u64>(),
|
||||
)
|
||||
})
|
||||
.fold((0, 0), |acc, next| (acc.0 + next.0, acc.1 + next.1));
|
||||
let rem_bytes = SiBytesAmount(rem_bytes);
|
||||
writeln!(f, "- rest of tenants ({}) not shown ({rem_count} layers or {:.1}%, {rem_bytes} or {:.1}% bytes)", sorted.len() - 10, 100.0 * rem_count as f64 / self.total.count as f64, 100.0 * rem_bytes.0 as f64 / self.total.file_sizes as f64)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1187,40 +1336,3 @@ mod filesystem_level_usage {
|
||||
assert!(!usage.has_pressure());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn relative_equal_bounds() {
|
||||
let order = EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first: false,
|
||||
};
|
||||
|
||||
let len = 10;
|
||||
let v = (0..len)
|
||||
.map(|i| order.relative_last_activity(len, i).into_inner())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(v.first(), Some(&1.0));
|
||||
assert_eq!(v.last(), Some(&0.0));
|
||||
assert!(v.windows(2).all(|slice| slice[0] > slice[1]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn relative_spare_bounds() {
|
||||
let order = EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
};
|
||||
|
||||
let len = 10;
|
||||
let v = (0..len)
|
||||
.map(|i| order.relative_last_activity(len, i).into_inner())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(v.first(), Some(&1.0));
|
||||
assert_eq!(v.last(), Some(&0.1));
|
||||
assert!(v.windows(2).all(|slice| slice[0] > slice[1]));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +68,6 @@ use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::GetTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
@@ -1676,16 +1675,8 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
|
||||
),
|
||||
GetActiveTenantError::Cancelled
|
||||
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. })
|
||||
| GetActiveTenantError::NotFound(GetTenantError::MapState(_)) => QueryError::Shutdown,
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)) => {
|
||||
QueryError::NotFound(format!("Tenant {tenant_id} not attached").into())
|
||||
}
|
||||
GetActiveTenantError::NotFound(
|
||||
GetTenantError::NotActive(_) | GetTenantError::Broken(_),
|
||||
) => {
|
||||
// If the tenant is present but not in a state where it can serve I/O, prompt client to backoff/retry by reconnecting
|
||||
QueryError::Reconnect
|
||||
| GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => {
|
||||
QueryError::Shutdown
|
||||
}
|
||||
e => QueryError::Other(anyhow::anyhow!(e)),
|
||||
}
|
||||
|
||||
@@ -1363,22 +1363,16 @@ impl WalIngest {
|
||||
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
|
||||
if let Some(max_xid) = acc {
|
||||
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
|
||||
Some(mbr.xid)
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
|
||||
if mbr.xid.wrapping_sub(acc) as i32 > 0 {
|
||||
mbr.xid
|
||||
} else {
|
||||
Some(mbr.xid)
|
||||
acc
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(max_xid) = max_mbr_xid {
|
||||
if self.checkpoint.update_next_xid(max_xid) {
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
if self.checkpoint.update_next_xid(max_mbr_xid) {
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2,8 +2,7 @@
|
||||
|
||||
use crate::{
|
||||
auth::password_hack::parse_endpoint_param, context::RequestMonitoring, error::UserFacingError,
|
||||
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::NeonOptions, serverless::SERVERLESS_DRIVER_SNI,
|
||||
EndpointId, RoleName,
|
||||
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::NeonOptions, EndpointId, RoleName,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
@@ -55,10 +54,10 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn endpoint_sni(
|
||||
sni: &str,
|
||||
pub fn endpoint_sni<'a>(
|
||||
sni: &'a str,
|
||||
common_names: &HashSet<String>,
|
||||
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
|
||||
) -> Result<&'a str, ComputeUserInfoParseError> {
|
||||
let Some((subdomain, common_name)) = sni.split_once('.') else {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
|
||||
};
|
||||
@@ -67,10 +66,7 @@ pub fn endpoint_sni(
|
||||
cn: common_name.into(),
|
||||
});
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(EndpointId::from(subdomain)))
|
||||
Ok(subdomain)
|
||||
}
|
||||
|
||||
impl ComputeUserInfoMaybeEndpoint {
|
||||
@@ -89,6 +85,7 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
// record the values if we have them
|
||||
ctx.set_application(params.get("application_name").map(SmolStr::from));
|
||||
ctx.set_user(user.clone());
|
||||
ctx.set_endpoint_id(sni.map(EndpointId::from));
|
||||
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let endpoint_option = params
|
||||
@@ -106,7 +103,7 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
|
||||
let endpoint_from_domain = if let Some(sni_str) = sni {
|
||||
if let Some(cn) = common_names {
|
||||
endpoint_sni(sni_str, cn)?
|
||||
Some(EndpointId::from(endpoint_sni(sni_str, cn)?))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -120,13 +117,12 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
Some(Err(InconsistentProjectNames { domain, option }))
|
||||
}
|
||||
// Invariant: project name may not contain certain characters.
|
||||
(a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) {
|
||||
(a, b) => a.or(b).map(|name| match project_name_valid(&name) {
|
||||
false => Err(MalformedProjectName(name)),
|
||||
true => Ok(name),
|
||||
}),
|
||||
}
|
||||
.transpose()?;
|
||||
ctx.set_endpoint_id(endpoint.clone());
|
||||
|
||||
info!(%user, project = endpoint.as_deref(), "credentials");
|
||||
if sni.is_some() {
|
||||
|
||||
@@ -91,11 +91,6 @@ impl RequestMonitoring {
|
||||
|
||||
pub fn set_endpoint_id(&mut self, endpoint_id: Option<EndpointId>) {
|
||||
self.endpoint_id = endpoint_id.or_else(|| self.endpoint_id.clone());
|
||||
if let Some(ep) = &self.endpoint_id {
|
||||
crate::metrics::CONNECTING_ENDPOINTS
|
||||
.with_label_values(&[self.protocol])
|
||||
.measure(&ep);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_application(&mut self, app: Option<SmolStr>) {
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use ::metrics::{
|
||||
exponential_buckets, register_histogram, register_histogram_vec, register_hll_vec,
|
||||
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge_vec, Histogram,
|
||||
HistogramVec, HyperLogLogVec, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
IntCounterPairVec, IntCounterVec,
|
||||
};
|
||||
use prometheus::{
|
||||
register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec,
|
||||
IntGaugeVec,
|
||||
};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -233,13 +236,3 @@ pub const fn bool_to_str(x: bool) -> &'static str {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
|
||||
pub static CONNECTING_ENDPOINTS: Lazy<HyperLogLogVec<32>> = Lazy::new(|| {
|
||||
register_hll_vec!(
|
||||
32,
|
||||
"proxy_connecting_endpoints",
|
||||
"HLL approximate cardinality of endpoints that are connecting",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
@@ -41,8 +41,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
pub const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
ws_listener: TcpListener,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::Context;
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use hyper::body::HttpBody;
|
||||
@@ -36,11 +35,11 @@ use crate::config::TlsConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::EndpointId;
|
||||
use crate::RoleName;
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
use super::SERVERLESS_DRIVER_SNI;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct QueryData {
|
||||
@@ -62,6 +61,7 @@ enum Payload {
|
||||
|
||||
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
|
||||
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
|
||||
const SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART: &str = "api";
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
@@ -188,7 +188,9 @@ fn get_conn_info(
|
||||
}
|
||||
}
|
||||
|
||||
let endpoint = endpoint_sni(hostname, &tls.common_names)?.context("malformed endpoint")?;
|
||||
let endpoint = endpoint_sni(hostname, &tls.common_names)?;
|
||||
|
||||
let endpoint: EndpointId = endpoint.into();
|
||||
ctx.set_endpoint_id(Some(endpoint.clone()));
|
||||
|
||||
let pairs = connection_url.query_pairs();
|
||||
@@ -225,7 +227,8 @@ fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Err
|
||||
let (_, hostname_rest) = hostname
|
||||
.split_once('.')
|
||||
.ok_or_else(|| anyhow::anyhow!("Unexpected hostname format."))?;
|
||||
Ok(sni_hostname_rest == hostname_rest && sni_hostname_first == SERVERLESS_DRIVER_SNI)
|
||||
Ok(sni_hostname_rest == hostname_rest
|
||||
&& sni_hostname_first == SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART)
|
||||
}
|
||||
|
||||
// TODO: return different http error codes
|
||||
|
||||
@@ -117,7 +117,10 @@ class NeonCompare(PgCompare):
|
||||
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||
|
||||
# Start pg
|
||||
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
|
||||
config_lines = ["max_replication_write_lag=-1", "max_replication_flush_lag=-1"]
|
||||
self._pg = self.env.endpoints.create_start(
|
||||
branch_name, "main", self.tenant, config_lines=config_lines
|
||||
)
|
||||
|
||||
@property
|
||||
def pg(self) -> PgProtocol:
|
||||
@@ -294,7 +297,7 @@ def remote_compare(zenbenchmark: NeonBenchmarker, remote_pg: RemotePostgres) ->
|
||||
return RemoteCompare(zenbenchmark, remote_pg)
|
||||
|
||||
|
||||
@pytest.fixture(params=["vanilla_compare", "neon_compare"], ids=["vanilla", "neon"])
|
||||
@pytest.fixture(params=["neon_compare"], ids=["neon"])
|
||||
def neon_with_baseline(request: FixtureRequest) -> PgCompare:
|
||||
"""Parameterized fixture that helps compare neon against vanilla postgres.
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ def init_pgbench(env: PgCompare, cmdline, password: None):
|
||||
t0 = timeit.default_timer()
|
||||
with env.record_pageserver_writes("init.pageserver_writes"):
|
||||
out = env.pg_bin.run_capture(cmdline, env=environ)
|
||||
env.flush()
|
||||
# env.flush()
|
||||
|
||||
duration = timeit.default_timer() - t0
|
||||
end_timestamp = utc_now_timestamp()
|
||||
@@ -94,9 +94,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
|
||||
if workload_type == PgBenchLoadType.INIT:
|
||||
# Run initialize
|
||||
init_pgbench(
|
||||
env, ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr], password=password
|
||||
)
|
||||
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", "-I", "dtG", connstr], password=password)
|
||||
|
||||
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
|
||||
# Run simple-update workload
|
||||
@@ -151,7 +149,7 @@ def get_durations_matrix(default: int = 45) -> List[int]:
|
||||
return rv
|
||||
|
||||
|
||||
def get_scales_matrix(default: int = 10) -> List[int]:
|
||||
def get_scales_matrix(default: int = 100) -> List[int]:
|
||||
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
|
||||
rv = []
|
||||
for s in scales.split(","):
|
||||
@@ -172,8 +170,8 @@ def get_scales_matrix(default: int = 10) -> List[int]:
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following 3 tests run on an existing database as it was set up by previous tests,
|
||||
|
||||
@@ -203,16 +203,6 @@ def test_import_at_2bil(
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# Also create a multi-XID with members past the 2 billion mark
|
||||
conn2 = endpoint.connect()
|
||||
cur2 = conn2.cursor()
|
||||
cur.execute("INSERT INTO t VALUES ('x')")
|
||||
cur.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
|
||||
cur2.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
|
||||
cur.execute("COMMIT")
|
||||
cur2.execute("COMMIT")
|
||||
|
||||
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
|
||||
# records would have the same effect.
|
||||
cur.execute("checkpoint")
|
||||
@@ -227,4 +217,4 @@ def test_import_at_2bil(
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (10000 + 1 + 1,)
|
||||
assert cur.fetchone() == (10000 + 1,)
|
||||
|
||||
@@ -51,7 +51,7 @@ memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
num-traits = { version = "0.2", features = ["i128"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.11" }
|
||||
@@ -100,7 +100,7 @@ memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
num-traits = { version = "0.2", features = ["i128"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.11" }
|
||||
|
||||
Reference in New Issue
Block a user