mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
refactor(consumption_metrics): post-split cleanup (#5327)
Split off from #5297. Builds upon #5326. Handles original review comments which I did not move to earlier split PRs. Completes test support for verifying events by notifying of the last batch of events. Adds cleaning up of tempfiles left because of an unlucky shutdown or SIGKILL. Finally closes #5175. Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
//!
|
||||
use chrono::{DateTime, Utc};
|
||||
use rand::Rng;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -54,8 +54,8 @@ impl EventType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub struct Event<Extra, Metric: Serialize> {
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub struct Event<Extra, Metric> {
|
||||
#[serde(flatten)]
|
||||
#[serde(rename = "type")]
|
||||
pub kind: EventType,
|
||||
|
||||
@@ -14,7 +14,7 @@ use tracing::*;
|
||||
use utils::id::NodeId;
|
||||
|
||||
mod metrics;
|
||||
use metrics::{Ids, MetricsKey};
|
||||
use metrics::MetricsKey;
|
||||
mod disk_cache;
|
||||
mod upload;
|
||||
|
||||
@@ -68,10 +68,11 @@ pub async fn collect_metrics(
|
||||
},
|
||||
);
|
||||
|
||||
let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);
|
||||
let path: Arc<PathBuf> = Arc::new(local_disk_storage);
|
||||
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval);
|
||||
|
||||
let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
|
||||
|
||||
let mut cached_metrics = tokio::select! {
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
@@ -108,14 +109,14 @@ pub async fn collect_metrics(
|
||||
// already here, better to try to flush the new values.
|
||||
|
||||
let flush = async {
|
||||
match disk_cache::flush_metrics_to_disk(&metrics, &final_path).await {
|
||||
match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
|
||||
Ok(()) => {
|
||||
tracing::debug!("flushed metrics to disk");
|
||||
}
|
||||
Err(e) => {
|
||||
// idea here is that if someone creates a directory as our final_path, then they
|
||||
// idea here is that if someone creates a directory as our path, then they
|
||||
// might notice it from the logs before shutdown and remove it
|
||||
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
|
||||
tracing::error!("failed to persist metrics to {path:?}: {e:#}");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -152,12 +153,10 @@ pub async fn collect_metrics(
|
||||
///
|
||||
/// Cancellation safe.
|
||||
async fn restore_and_reschedule(
|
||||
final_path: &Arc<PathBuf>,
|
||||
path: &Arc<PathBuf>,
|
||||
metric_collection_interval: Duration,
|
||||
) -> Cache {
|
||||
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(final_path.clone())
|
||||
.await
|
||||
{
|
||||
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
|
||||
Ok(found_some) => {
|
||||
// there is no min needed because we write these sequentially in
|
||||
// collect_all_metrics
|
||||
@@ -175,12 +174,11 @@ async fn restore_and_reschedule(
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
let root = e.root_cause();
|
||||
|
||||
let maybe_ioerr = root.downcast_ref::<Error>();
|
||||
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
|
||||
|
||||
if !is_not_found {
|
||||
tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}");
|
||||
tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
|
||||
}
|
||||
|
||||
(HashMap::new(), None)
|
||||
|
||||
@@ -9,6 +9,13 @@ pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _e = span.entered();
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
if let Err(e) = scan_and_delete_with_same_prefix(&path) {
|
||||
tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
let mut file = std::fs::File::open(&*path)?;
|
||||
let reader = std::io::BufReader::new(&mut file);
|
||||
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
|
||||
@@ -18,26 +25,68 @@ pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result
|
||||
.and_then(|x| x)
|
||||
}
|
||||
|
||||
fn scan_and_delete_with_same_prefix(path: &std::path::Path) -> std::io::Result<()> {
|
||||
let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
|
||||
|
||||
let prefix = path.file_name().expect("caller checked").to_string_lossy();
|
||||
|
||||
for entry in it {
|
||||
let entry = entry?;
|
||||
if !entry.metadata()?.is_file() {
|
||||
continue;
|
||||
}
|
||||
let file_name = entry.file_name();
|
||||
|
||||
if path.file_name().unwrap() == file_name {
|
||||
// do not remove our actual file
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = file_name.to_string_lossy();
|
||||
|
||||
if !file_name.starts_with(&*prefix) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
if let Err(e) = std::fs::remove_file(&path) {
|
||||
tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
|
||||
} else {
|
||||
tracing::info!("cleaned up old tempfile {file_name:?}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn flush_metrics_to_disk(
|
||||
current_metrics: &Arc<Vec<RawMetric>>,
|
||||
final_path: &Arc<PathBuf>,
|
||||
path: &Arc<PathBuf>,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::io::Write;
|
||||
|
||||
anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
|
||||
anyhow::ensure!(
|
||||
final_path.parent().is_some(),
|
||||
"path must have parent: {final_path:?}"
|
||||
path.file_name().is_some(),
|
||||
"path must have filename: {path:?}"
|
||||
);
|
||||
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking({
|
||||
let current_metrics = current_metrics.clone();
|
||||
let final_path = final_path.clone();
|
||||
let path = path.clone();
|
||||
move || {
|
||||
let _e = span.entered();
|
||||
|
||||
let mut tempfile =
|
||||
tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?;
|
||||
let parent = path.parent().expect("existence checked");
|
||||
let file_name = path.file_name().expect("existence checked");
|
||||
let mut tempfile = tempfile::Builder::new()
|
||||
.prefix(file_name)
|
||||
.suffix(".tmp")
|
||||
.tempfile_in(parent)?;
|
||||
|
||||
tracing::debug!("using tempfile {:?}", tempfile.path());
|
||||
|
||||
// write out all of the raw metrics, to be read out later on restart as cached values
|
||||
{
|
||||
@@ -52,15 +101,17 @@ pub(super) async fn flush_metrics_to_disk(
|
||||
tempfile.flush()?;
|
||||
tempfile.as_file().sync_all()?;
|
||||
|
||||
drop(tempfile.persist(&*final_path)?);
|
||||
fail::fail_point!("before-persist-last-metrics-collected");
|
||||
|
||||
let f = std::fs::File::open(final_path.parent().unwrap())?;
|
||||
drop(tempfile.persist(&*path).map_err(|e| e.error)?);
|
||||
|
||||
let f = std::fs::File::open(path.parent().unwrap())?;
|
||||
f.sync_all()?;
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("write metrics to {final_path:?} join error"))
|
||||
.and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}")))
|
||||
.with_context(|| format!("write metrics to {path:?} join error"))
|
||||
.and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
|
||||
}
|
||||
|
||||
@@ -1,36 +1,21 @@
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::mgr;
|
||||
use anyhow::Context;
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::EventType;
|
||||
use futures::stream::StreamExt;
|
||||
use pageserver_api::models::TenantState;
|
||||
use serde::Serialize;
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use anyhow::Context;
|
||||
use serde_with::serde_as;
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{Cache, RawMetric};
|
||||
|
||||
// FIXME: all other consumption_metrics::Event stuff is over at uploading, maybe move?
|
||||
#[serde_as]
|
||||
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)]
|
||||
pub(super) struct Ids {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub(super) tenant_id: TenantId,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) timeline_id: Option<TimelineId>,
|
||||
}
|
||||
|
||||
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
|
||||
/// instead of static str.
|
||||
// Do not rename any of these without first consulting with data team and partner
|
||||
// management.
|
||||
// FIXME: write those tests before refactoring to this!
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||
pub(super) enum Name {
|
||||
/// Timeline last_record_lsn, absolute
|
||||
@@ -59,7 +44,7 @@ pub(super) enum Name {
|
||||
/// elsewhere.
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||
pub(super) struct MetricsKey {
|
||||
pub(crate) struct MetricsKey {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub(super) tenant_id: TenantId,
|
||||
|
||||
@@ -83,7 +68,7 @@ impl MetricsKey {
|
||||
struct AbsoluteValueFactory(MetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
let key = self.0;
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
@@ -98,7 +83,7 @@ struct IncrementalValueFactory(MetricsKey);
|
||||
|
||||
impl IncrementalValueFactory {
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
fn from_previous_up_to(
|
||||
const fn from_until(
|
||||
self,
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
@@ -106,16 +91,11 @@ impl IncrementalValueFactory {
|
||||
) -> RawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
(
|
||||
key,
|
||||
(
|
||||
EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
},
|
||||
val,
|
||||
),
|
||||
)
|
||||
let when = EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
};
|
||||
(key, (when, val))
|
||||
}
|
||||
|
||||
fn key(&self) -> &MetricsKey {
|
||||
@@ -209,9 +189,11 @@ pub(super) async fn collect_all_metrics(
|
||||
cached_metrics: &Cache,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<RawMetric> {
|
||||
use pageserver_api::models::TenantState;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let tenants = match mgr::list_tenants().await {
|
||||
let tenants = match crate::tenant::mgr::list_tenants().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(err) => {
|
||||
tracing::error!("failed to list tenants: {:?}", err);
|
||||
@@ -223,7 +205,7 @@ pub(super) async fn collect_all_metrics(
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
mgr::get_tenant(id, true)
|
||||
crate::tenant::mgr::get_tenant(id, true)
|
||||
.await
|
||||
.ok()
|
||||
.map(|tenant| (id, tenant))
|
||||
@@ -285,7 +267,7 @@ where
|
||||
current_metrics
|
||||
}
|
||||
|
||||
/// Testing helping in-between abstraction allowing testing metrics without actual Tenants.
|
||||
/// In-between abstraction to allow testing metrics without actual Tenants.
|
||||
struct TenantSnapshot {
|
||||
resident_size: u64,
|
||||
remote_size: u64,
|
||||
@@ -441,14 +423,14 @@ impl TimelineSnapshot {
|
||||
let up_to = now;
|
||||
|
||||
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
|
||||
let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta);
|
||||
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
|
||||
// written_size_delta
|
||||
metrics.push(key_value);
|
||||
// written_size
|
||||
metrics.push((key, written_size_now));
|
||||
} else {
|
||||
// the cached value was ahead of us, report zero until we've caught up
|
||||
metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0));
|
||||
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
|
||||
// the cached value was ahead of us, report the same until we've caught up
|
||||
metrics.push((key, (written_size_now.0, prev.1)));
|
||||
}
|
||||
@@ -468,3 +450,6 @@ impl TimelineSnapshot {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::metric_examples;
|
||||
|
||||
@@ -1,13 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use std::time::SystemTime;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[test]
|
||||
fn startup_collected_timeline_metrics_before_advancing() {
|
||||
@@ -33,7 +27,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
snap.loaded_at.1.into(),
|
||||
now,
|
||||
0
|
||||
@@ -73,8 +67,7 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(before, now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
@@ -100,11 +93,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
// at t=before was the last time the last_record_lsn changed
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
|
||||
// end time of this event is used for the next ones
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
before,
|
||||
just_before,
|
||||
0,
|
||||
),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
@@ -118,81 +107,13 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
just_before,
|
||||
now,
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metric_image_stability() {
|
||||
// it is important that these strings stay as they are
|
||||
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap();
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap();
|
||||
|
||||
let [now, before] = [DateTime::<Utc>::from(now), DateTime::from(before)];
|
||||
|
||||
let examples = [
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(before, now, 0),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::resident_size(tenant_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
];
|
||||
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0);
|
||||
|
||||
for (line, (key, (kind, value)), expected) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(expected, actual, "example from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
|
||||
@@ -220,7 +141,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
@@ -234,7 +155,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
before_restart,
|
||||
now,
|
||||
0
|
||||
@@ -252,8 +173,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
assert_eq!(
|
||||
metrics,
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(now, later, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
|
||||
]
|
||||
);
|
||||
@@ -359,3 +279,19 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
|
||||
times
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
MetricsKey::resident_size(tenant_id).at(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1,8 +1,21 @@
|
||||
use consumption_metrics::{idempotency_key, Event, EventChunk, CHUNK_SIZE};
|
||||
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
|
||||
use serde_with::serde_as;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::{Cache, Ids, RawMetric};
|
||||
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// How the metrics from pageserver are identified.
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
struct Ids {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub(super) tenant_id: TenantId,
|
||||
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(super) timeline_id: Option<TimelineId>,
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
|
||||
pub(super) async fn upload_metrics(
|
||||
@@ -13,44 +26,21 @@ pub(super) async fn upload_metrics(
|
||||
metrics: &[RawMetric],
|
||||
cached_metrics: &mut Cache,
|
||||
) -> anyhow::Result<()> {
|
||||
use bytes::BufMut;
|
||||
|
||||
let mut uploaded = 0;
|
||||
let mut failed = 0;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
|
||||
let mut buffer = bytes::BytesMut::new();
|
||||
let mut chunk_to_send = Vec::new();
|
||||
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
|
||||
|
||||
for chunk in metrics.chunks(CHUNK_SIZE) {
|
||||
chunk_to_send.clear();
|
||||
while let Some(res) = iter.next() {
|
||||
let (chunk, body) = res?;
|
||||
|
||||
// FIXME: this should always overwrite and truncate to chunk.len()
|
||||
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
|
||||
kind: *when,
|
||||
metric: curr_key.metric,
|
||||
// FIXME: finally write! this to the prev allocation
|
||||
idempotency_key: idempotency_key(node_id),
|
||||
value: *curr_val,
|
||||
extra: Ids {
|
||||
tenant_id: curr_key.tenant_id,
|
||||
timeline_id: curr_key.timeline_id,
|
||||
},
|
||||
}));
|
||||
|
||||
serde_json::to_writer(
|
||||
(&mut buffer).writer(),
|
||||
&EventChunk {
|
||||
events: (&chunk_to_send).into(),
|
||||
},
|
||||
)?;
|
||||
|
||||
let body = buffer.split().freeze();
|
||||
let event_bytes = body.len();
|
||||
|
||||
let res = upload(client, metric_collection_endpoint, body, cancel)
|
||||
let is_last = iter.len() == 0;
|
||||
|
||||
let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
|
||||
.instrument(tracing::info_span!(
|
||||
"upload",
|
||||
%event_bytes,
|
||||
@@ -88,6 +78,150 @@ pub(super) async fn upload_metrics(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The return type is quite ugly, but we gain testability in isolation
|
||||
fn serialize_in_chunks<'a, F>(
|
||||
chunk_size: usize,
|
||||
input: &'a [RawMetric],
|
||||
factory: F,
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
where
|
||||
F: KeyGen<'a> + 'a,
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
struct Iter<'a, F> {
|
||||
inner: std::slice::Chunks<'a, RawMetric>,
|
||||
chunk_size: usize,
|
||||
|
||||
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
|
||||
buffer: bytes::BytesMut,
|
||||
// chunk amount of events are reused to produce the serialized document
|
||||
scratch: Vec<Event<Ids, Name>>,
|
||||
factory: F,
|
||||
}
|
||||
|
||||
impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
|
||||
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let chunk = self.inner.next()?;
|
||||
|
||||
if self.scratch.is_empty() {
|
||||
// first round: create events with N strings
|
||||
self.scratch.extend(
|
||||
chunk
|
||||
.iter()
|
||||
.map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
|
||||
);
|
||||
} else {
|
||||
// next rounds: update_in_place to reuse allocations
|
||||
assert_eq!(self.scratch.len(), self.chunk_size);
|
||||
self.scratch
|
||||
.iter_mut()
|
||||
.zip(chunk.iter())
|
||||
.for_each(|(slot, raw_metric)| {
|
||||
raw_metric.update_in_place(slot, &self.factory.generate())
|
||||
});
|
||||
}
|
||||
|
||||
let res = serde_json::to_writer(
|
||||
(&mut self.buffer).writer(),
|
||||
&EventChunk {
|
||||
events: (&self.scratch[..chunk.len()]).into(),
|
||||
},
|
||||
);
|
||||
|
||||
match res {
|
||||
Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
|
||||
Err(e) => Some(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
|
||||
|
||||
let buffer = bytes::BytesMut::new();
|
||||
let inner = input.chunks(chunk_size);
|
||||
let scratch = Vec::new();
|
||||
|
||||
Iter {
|
||||
inner,
|
||||
chunk_size,
|
||||
buffer,
|
||||
scratch,
|
||||
factory,
|
||||
}
|
||||
}
|
||||
|
||||
trait RawMetricExt {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
|
||||
}
|
||||
|
||||
impl RawMetricExt for RawMetric {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.0;
|
||||
|
||||
let (kind, value) = self.1;
|
||||
|
||||
Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
|
||||
use std::fmt::Write;
|
||||
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.0;
|
||||
|
||||
let (kind, value) = self.1;
|
||||
|
||||
*event = Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: {
|
||||
event.idempotency_key.clear();
|
||||
write!(event.idempotency_key, "{key}").unwrap();
|
||||
std::mem::take(&mut event.idempotency_key)
|
||||
},
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
trait KeyGen<'a>: Copy {
|
||||
fn generate(&self) -> IdempotencyKey<'a>;
|
||||
}
|
||||
|
||||
impl<'a> KeyGen<'a> for &'a str {
|
||||
fn generate(&self) -> IdempotencyKey<'a> {
|
||||
IdempotencyKey::generate(self)
|
||||
}
|
||||
}
|
||||
|
||||
enum UploadError {
|
||||
Rejected(reqwest::StatusCode),
|
||||
Reqwest(reqwest::Error),
|
||||
@@ -119,11 +253,16 @@ impl UploadError {
|
||||
}
|
||||
}
|
||||
|
||||
// this is consumed by the test verifiers
|
||||
static LAST_IN_BATCH: reqwest::header::HeaderName =
|
||||
reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
|
||||
|
||||
async fn upload(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
body: bytes::Bytes,
|
||||
cancel: &CancellationToken,
|
||||
is_last: bool,
|
||||
) -> Result<(), UploadError> {
|
||||
let warn_after = 3;
|
||||
let max_attempts = 10;
|
||||
@@ -134,17 +273,24 @@ async fn upload(
|
||||
let res = client
|
||||
.post(metric_collection_endpoint.clone())
|
||||
.header(reqwest::header::CONTENT_TYPE, "application/json")
|
||||
.header(
|
||||
LAST_IN_BATCH.clone(),
|
||||
if is_last { "true" } else { "false" },
|
||||
)
|
||||
.body(body)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let res = res.and_then(|res| res.error_for_status());
|
||||
|
||||
// 10 redirects are normally allowed, so we don't need worry about 3xx
|
||||
match res {
|
||||
Ok(_response) => Ok(()),
|
||||
Err(e) => {
|
||||
let status = e.status().filter(|s| s.is_client_error());
|
||||
if let Some(status) = status {
|
||||
// rejection used to be a thing when the server could reject a
|
||||
// whole batch of metrics if one metric was bad.
|
||||
Err(UploadError::Rejected(status))
|
||||
} else {
|
||||
Err(UploadError::Reqwest(e))
|
||||
@@ -175,3 +321,123 @@ async fn upload(
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
#[test]
|
||||
fn chunked_serialization() {
|
||||
let examples = metric_samples();
|
||||
assert!(examples.len() > 1);
|
||||
|
||||
let factory = FixedGen::new(Utc::now(), "1", 42);
|
||||
|
||||
// need to use Event here because serde_json::Value uses default hashmap, not linked
|
||||
// hashmap
|
||||
#[derive(serde::Deserialize)]
|
||||
struct EventChunk {
|
||||
events: Vec<Event<Ids, Name>>,
|
||||
}
|
||||
|
||||
let correct = serialize_in_chunks(examples.len(), &examples, factory)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for chunk_size in 1..examples.len() {
|
||||
let actual = serialize_in_chunks(chunk_size, &examples, factory)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// if these are equal, it means that multi-chunking version works as well
|
||||
assert_eq!(correct, actual);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
|
||||
|
||||
impl<'a> FixedGen<'a> {
|
||||
fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
|
||||
FixedGen(now, node_id, nonce)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> KeyGen<'a> for FixedGen<'a> {
|
||||
fn generate(&self) -> IdempotencyKey<'a> {
|
||||
IdempotencyKey::for_tests(self.0, self.1, self.2)
|
||||
}
|
||||
}
|
||||
|
||||
static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
|
||||
DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into()
|
||||
});
|
||||
|
||||
#[test]
|
||||
fn metric_image_stability() {
|
||||
// it is important that these strings stay as they are
|
||||
|
||||
let examples = [
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
];
|
||||
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
|
||||
let examples = examples.into_iter().zip(metric_samples());
|
||||
|
||||
for ((line, expected), (key, (kind, value))) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
fn metric_samples() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into();
|
||||
let [now, before] = [*SAMPLES_NOW, before];
|
||||
|
||||
super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, Set
|
||||
|
||||
@@ -28,6 +30,7 @@ def test_metric_collection(
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
@@ -35,7 +38,9 @@ def test_metric_collection(
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
uploads.put(events)
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
@@ -43,15 +48,12 @@ def test_metric_collection(
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"""
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
+ "tenant_config={pitr_interval = '0 sec'}"
|
||||
)
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -63,7 +65,7 @@ def test_metric_collection(
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
tenant_id = env.initial_tenant
|
||||
@@ -124,19 +126,20 @@ def test_metric_collection(
|
||||
events = uploads.get(timeout=timeout)
|
||||
|
||||
if events == "ready":
|
||||
events = uploads.get(timeout=timeout)
|
||||
v.ingest(events)
|
||||
(events, is_last) = uploads.get(timeout=timeout)
|
||||
v.ingest(events, is_last)
|
||||
break
|
||||
else:
|
||||
v.ingest(events)
|
||||
(events, is_last) = events
|
||||
v.ingest(events, is_last)
|
||||
|
||||
if "synthetic_storage_size" not in v.accepted_event_names():
|
||||
log.info("waiting for synthetic storage size to be calculated and uploaded...")
|
||||
|
||||
rounds = 0
|
||||
while "synthetic_storage_size" not in v.accepted_event_names():
|
||||
events = uploads.get(timeout=timeout)
|
||||
v.ingest(events)
|
||||
(events, is_last) = uploads.get(timeout=timeout)
|
||||
v.ingest(events, is_last)
|
||||
rounds += 1
|
||||
assert rounds < 10, "did not get synthetic_storage_size in 10 uploads"
|
||||
# once we have it in verifiers, it will assert that future batches will contain it
|
||||
@@ -150,17 +153,161 @@ def test_metric_collection(
|
||||
events = uploads.get(timeout=timeout)
|
||||
|
||||
if events == "ready":
|
||||
events = uploads.get(timeout=timeout * 3)
|
||||
v.ingest(events)
|
||||
events = uploads.get(timeout=timeout)
|
||||
v.ingest(events)
|
||||
(events, is_last) = uploads.get(timeout=timeout * 3)
|
||||
v.ingest(events, is_last)
|
||||
(events, is_last) = uploads.get(timeout=timeout)
|
||||
v.ingest(events, is_last)
|
||||
break
|
||||
else:
|
||||
v.ingest(events)
|
||||
(events, is_last) = events
|
||||
v.ingest(events, is_last)
|
||||
|
||||
httpserver.check()
|
||||
|
||||
|
||||
def test_metric_collection_cleans_up_tempfile(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO foo
|
||||
SELECT g, 0, 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
"""
|
||||
)
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
|
||||
# so give 5s we only want to get the following upload after "ready" value.
|
||||
timeout = 5
|
||||
|
||||
# these strings in the upload queue allow synchronizing with the uploads
|
||||
# and the main test execution
|
||||
uploads.put("ready")
|
||||
|
||||
while True:
|
||||
events = uploads.get(timeout=timeout)
|
||||
|
||||
if events == "ready":
|
||||
(events, _) = uploads.get(timeout=timeout)
|
||||
break
|
||||
|
||||
# should really configure an env?
|
||||
pageserver_http.configure_failpoints(("before-persist-last-metrics-collected", "exit"))
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
initially = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
|
||||
|
||||
assert (
|
||||
len(initially.matching) == 2
|
||||
), f"expecting actual file and tempfile, but not found: {initially.matching}"
|
||||
|
||||
uploads.put("ready")
|
||||
env.pageserver.start()
|
||||
|
||||
while True:
|
||||
events = uploads.get(timeout=timeout * 3)
|
||||
|
||||
if events == "ready":
|
||||
(events, _) = uploads.get(timeout=timeout)
|
||||
break
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
later = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
|
||||
|
||||
# it is possible we shutdown the pageserver right at the correct time, so the old tempfile
|
||||
# is gone, but we also have a new one.
|
||||
only = set(["last_consumption_metrics.json"])
|
||||
assert (
|
||||
initially.matching.intersection(later.matching) == only
|
||||
), "only initial tempfile should had been removed"
|
||||
assert initially.other.issuperset(later.other), "no other files should had been removed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PrefixPartitionedFiles:
|
||||
matching: Set[str]
|
||||
other: Set[str]
|
||||
|
||||
|
||||
def iterate_pageserver_workdir(path: Path, prefix: str) -> PrefixPartitionedFiles:
|
||||
"""
|
||||
Iterates the files in the workdir, returns two sets:
|
||||
- files with the prefix
|
||||
- files without the prefix
|
||||
"""
|
||||
|
||||
matching = set()
|
||||
other = set()
|
||||
for entry in path.iterdir():
|
||||
if not entry.is_file():
|
||||
continue
|
||||
|
||||
if not entry.name.startswith(prefix):
|
||||
other.add(entry.name)
|
||||
else:
|
||||
matching.add(entry.name)
|
||||
|
||||
return PrefixPartitionedFiles(matching, other)
|
||||
|
||||
|
||||
class MetricsVerifier:
|
||||
"""
|
||||
A graph of per tenant per timeline verifiers, allowing one for each
|
||||
@@ -171,7 +318,7 @@ class MetricsVerifier:
|
||||
self.tenants: Dict[TenantId, TenantMetricsVerifier] = {}
|
||||
pass
|
||||
|
||||
def ingest(self, events):
|
||||
def ingest(self, events, is_last):
|
||||
stringified = json.dumps(events, indent=2)
|
||||
log.info(f"ingesting: {stringified}")
|
||||
for event in events:
|
||||
@@ -181,8 +328,9 @@ class MetricsVerifier:
|
||||
|
||||
self.tenants[id].ingest(event)
|
||||
|
||||
for t in self.tenants.values():
|
||||
t.post_batch()
|
||||
if is_last:
|
||||
for t in self.tenants.values():
|
||||
t.post_batch()
|
||||
|
||||
def accepted_event_names(self) -> Set[str]:
|
||||
names: Set[str] = set()
|
||||
|
||||
Reference in New Issue
Block a user