diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index 040b047d6f..7b133c61af 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -3,7 +3,7 @@ //! use chrono::{DateTime, Utc}; use rand::Rng; -use serde::Serialize; +use serde::{Deserialize, Serialize}; #[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[serde(tag = "type")] @@ -54,8 +54,8 @@ impl EventType { } } -#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] -pub struct Event { +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +pub struct Event { #[serde(flatten)] #[serde(rename = "type")] pub kind: EventType, diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 42af40d02a..5f64bb2b3b 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -14,7 +14,7 @@ use tracing::*; use utils::id::NodeId; mod metrics; -use metrics::{Ids, MetricsKey}; +use metrics::MetricsKey; mod disk_cache; mod upload; @@ -68,10 +68,11 @@ pub async fn collect_metrics( }, ); - let final_path: Arc = Arc::new(local_disk_storage); + let path: Arc = Arc::new(local_disk_storage); let cancel = task_mgr::shutdown_token(); - let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval); + + let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval); let mut cached_metrics = tokio::select! { _ = cancel.cancelled() => return Ok(()), @@ -108,14 +109,14 @@ pub async fn collect_metrics( // already here, better to try to flush the new values. let flush = async { - match disk_cache::flush_metrics_to_disk(&metrics, &final_path).await { + match disk_cache::flush_metrics_to_disk(&metrics, &path).await { Ok(()) => { tracing::debug!("flushed metrics to disk"); } Err(e) => { - // idea here is that if someone creates a directory as our final_path, then they + // idea here is that if someone creates a directory as our path, then they // might notice it from the logs before shutdown and remove it - tracing::error!("failed to persist metrics to {final_path:?}: {e:#}"); + tracing::error!("failed to persist metrics to {path:?}: {e:#}"); } } }; @@ -152,12 +153,10 @@ pub async fn collect_metrics( /// /// Cancellation safe. async fn restore_and_reschedule( - final_path: &Arc, + path: &Arc, metric_collection_interval: Duration, ) -> Cache { - let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(final_path.clone()) - .await - { + let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await { Ok(found_some) => { // there is no min needed because we write these sequentially in // collect_all_metrics @@ -175,12 +174,11 @@ async fn restore_and_reschedule( use std::io::{Error, ErrorKind}; let root = e.root_cause(); - let maybe_ioerr = root.downcast_ref::(); let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound); if !is_not_found { - tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}"); + tracing::info!("failed to read any previous metrics from {path:?}: {e:#}"); } (HashMap::new(), None) diff --git a/pageserver/src/consumption_metrics/disk_cache.rs b/pageserver/src/consumption_metrics/disk_cache.rs index 1fd1338c95..4b1cd79c6d 100644 --- a/pageserver/src/consumption_metrics/disk_cache.rs +++ b/pageserver/src/consumption_metrics/disk_cache.rs @@ -9,6 +9,13 @@ pub(super) async fn read_metrics_from_disk(path: Arc) -> anyhow::Result let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { let _e = span.entered(); + + if let Some(parent) = path.parent() { + if let Err(e) = scan_and_delete_with_same_prefix(&path) { + tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}"); + } + } + let mut file = std::fs::File::open(&*path)?; let reader = std::io::BufReader::new(&mut file); anyhow::Ok(serde_json::from_reader::<_, Vec>(reader)?) @@ -18,26 +25,68 @@ pub(super) async fn read_metrics_from_disk(path: Arc) -> anyhow::Result .and_then(|x| x) } +fn scan_and_delete_with_same_prefix(path: &std::path::Path) -> std::io::Result<()> { + let it = std::fs::read_dir(path.parent().expect("caller checked"))?; + + let prefix = path.file_name().expect("caller checked").to_string_lossy(); + + for entry in it { + let entry = entry?; + if !entry.metadata()?.is_file() { + continue; + } + let file_name = entry.file_name(); + + if path.file_name().unwrap() == file_name { + // do not remove our actual file + continue; + } + + let file_name = file_name.to_string_lossy(); + + if !file_name.starts_with(&*prefix) { + continue; + } + + let path = entry.path(); + + if let Err(e) = std::fs::remove_file(&path) { + tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}"); + } else { + tracing::info!("cleaned up old tempfile {file_name:?}"); + } + } + + Ok(()) +} + pub(super) async fn flush_metrics_to_disk( current_metrics: &Arc>, - final_path: &Arc, + path: &Arc, ) -> anyhow::Result<()> { use std::io::Write; + anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}"); anyhow::ensure!( - final_path.parent().is_some(), - "path must have parent: {final_path:?}" + path.file_name().is_some(), + "path must have filename: {path:?}" ); let span = tracing::Span::current(); tokio::task::spawn_blocking({ let current_metrics = current_metrics.clone(); - let final_path = final_path.clone(); + let path = path.clone(); move || { let _e = span.entered(); - let mut tempfile = - tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?; + let parent = path.parent().expect("existence checked"); + let file_name = path.file_name().expect("existence checked"); + let mut tempfile = tempfile::Builder::new() + .prefix(file_name) + .suffix(".tmp") + .tempfile_in(parent)?; + + tracing::debug!("using tempfile {:?}", tempfile.path()); // write out all of the raw metrics, to be read out later on restart as cached values { @@ -52,15 +101,17 @@ pub(super) async fn flush_metrics_to_disk( tempfile.flush()?; tempfile.as_file().sync_all()?; - drop(tempfile.persist(&*final_path)?); + fail::fail_point!("before-persist-last-metrics-collected"); - let f = std::fs::File::open(final_path.parent().unwrap())?; + drop(tempfile.persist(&*path).map_err(|e| e.error)?); + + let f = std::fs::File::open(path.parent().unwrap())?; f.sync_all()?; anyhow::Ok(()) } }) .await - .with_context(|| format!("write metrics to {final_path:?} join error")) - .and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}"))) + .with_context(|| format!("write metrics to {path:?} join error")) + .and_then(|x| x.with_context(|| format!("write metrics to {path:?}"))) } diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index e03d2dc34f..652dd98683 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -1,36 +1,21 @@ use crate::context::RequestContext; -use crate::tenant::mgr; +use anyhow::Context; use chrono::{DateTime, Utc}; use consumption_metrics::EventType; use futures::stream::StreamExt; -use pageserver_api::models::TenantState; -use serde::Serialize; -use serde_with::{serde_as, DisplayFromStr}; -use std::sync::Arc; -use std::time::SystemTime; -use utils::id::{TenantId, TimelineId}; -use utils::lsn::Lsn; - -use anyhow::Context; +use serde_with::serde_as; +use std::{sync::Arc, time::SystemTime}; +use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, +}; use super::{Cache, RawMetric}; -// FIXME: all other consumption_metrics::Event stuff is over at uploading, maybe move? -#[serde_as] -#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)] -pub(super) struct Ids { - #[serde_as(as = "DisplayFromStr")] - pub(super) tenant_id: TenantId, - #[serde_as(as = "Option")] - #[serde(skip_serializing_if = "Option::is_none")] - pub(super) timeline_id: Option, -} - /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events` /// instead of static str. // Do not rename any of these without first consulting with data team and partner // management. -// FIXME: write those tests before refactoring to this! #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub(super) enum Name { /// Timeline last_record_lsn, absolute @@ -59,7 +44,7 @@ pub(super) enum Name { /// elsewhere. #[serde_with::serde_as] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] -pub(super) struct MetricsKey { +pub(crate) struct MetricsKey { #[serde_as(as = "serde_with::DisplayFromStr")] pub(super) tenant_id: TenantId, @@ -83,7 +68,7 @@ impl MetricsKey { struct AbsoluteValueFactory(MetricsKey); impl AbsoluteValueFactory { - fn at(self, time: DateTime, val: u64) -> RawMetric { + const fn at(self, time: DateTime, val: u64) -> RawMetric { let key = self.0; (key, (EventType::Absolute { time }, val)) } @@ -98,7 +83,7 @@ struct IncrementalValueFactory(MetricsKey); impl IncrementalValueFactory { #[allow(clippy::wrong_self_convention)] - fn from_previous_up_to( + const fn from_until( self, prev_end: DateTime, up_to: DateTime, @@ -106,16 +91,11 @@ impl IncrementalValueFactory { ) -> RawMetric { let key = self.0; // cannot assert prev_end < up_to because these are realtime clock based - ( - key, - ( - EventType::Incremental { - start_time: prev_end, - stop_time: up_to, - }, - val, - ), - ) + let when = EventType::Incremental { + start_time: prev_end, + stop_time: up_to, + }; + (key, (when, val)) } fn key(&self) -> &MetricsKey { @@ -209,9 +189,11 @@ pub(super) async fn collect_all_metrics( cached_metrics: &Cache, ctx: &RequestContext, ) -> Vec { + use pageserver_api::models::TenantState; + let started_at = std::time::Instant::now(); - let tenants = match mgr::list_tenants().await { + let tenants = match crate::tenant::mgr::list_tenants().await { Ok(tenants) => tenants, Err(err) => { tracing::error!("failed to list tenants: {:?}", err); @@ -223,7 +205,7 @@ pub(super) async fn collect_all_metrics( if state != TenantState::Active { None } else { - mgr::get_tenant(id, true) + crate::tenant::mgr::get_tenant(id, true) .await .ok() .map(|tenant| (id, tenant)) @@ -285,7 +267,7 @@ where current_metrics } -/// Testing helping in-between abstraction allowing testing metrics without actual Tenants. +/// In-between abstraction to allow testing metrics without actual Tenants. struct TenantSnapshot { resident_size: u64, remote_size: u64, @@ -441,14 +423,14 @@ impl TimelineSnapshot { let up_to = now; if let Some(delta) = written_size_now.1.checked_sub(prev.1) { - let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta); + let key_value = written_size_delta_key.from_until(prev.0, up_to, delta); // written_size_delta metrics.push(key_value); // written_size metrics.push((key, written_size_now)); } else { // the cached value was ahead of us, report zero until we've caught up - metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0)); + metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0)); // the cached value was ahead of us, report the same until we've caught up metrics.push((key, (written_size_now.0, prev.1))); } @@ -468,3 +450,6 @@ impl TimelineSnapshot { #[cfg(test)] mod tests; + +#[cfg(test)] +pub(crate) use tests::metric_examples; diff --git a/pageserver/src/consumption_metrics/metrics/tests.rs b/pageserver/src/consumption_metrics/metrics/tests.rs index f47bccce54..38a4c9eb5d 100644 --- a/pageserver/src/consumption_metrics/metrics/tests.rs +++ b/pageserver/src/consumption_metrics/metrics/tests.rs @@ -1,13 +1,7 @@ -use std::collections::HashMap; - -use std::time::SystemTime; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; - use super::*; -use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::time::SystemTime; +use utils::lsn::Lsn; #[test] fn startup_collected_timeline_metrics_before_advancing() { @@ -33,7 +27,7 @@ fn startup_collected_timeline_metrics_before_advancing() { assert_eq!( metrics, &[ - MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until( snap.loaded_at.1.into(), now, 0 @@ -73,8 +67,7 @@ fn startup_collected_timeline_metrics_second_round() { assert_eq!( metrics, &[ - MetricsKey::written_size_delta(tenant_id, timeline_id) - .from_previous_up_to(before, now, 0), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0), MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0), MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000) ] @@ -100,11 +93,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() { // at t=before was the last time the last_record_lsn changed MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0), // end time of this event is used for the next ones - MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( - before, - just_before, - 0, - ), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0), ]); let snap = TimelineSnapshot { @@ -118,81 +107,13 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() { assert_eq!( metrics, &[ - MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( - just_before, - now, - 0 - ), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0), MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0), MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000) ] ); } -#[test] -fn metric_image_stability() { - // it is important that these strings stay as they are - - let tenant_id = TenantId::from_array([0; 16]); - let timeline_id = TimelineId::from_array([0xff; 16]); - - let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap(); - let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap(); - - let [now, before] = [DateTime::::from(now), DateTime::from(before)]; - - let examples = [ - ( - line!(), - MetricsKey::written_size(tenant_id, timeline_id).at(now, 0), - r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, - ), - ( - line!(), - MetricsKey::written_size_delta(tenant_id, timeline_id) - .from_previous_up_to(before, now, 0), - r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, - ), - ( - line!(), - MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0), - r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, - ), - ( - line!(), - MetricsKey::remote_storage_size(tenant_id).at(now, 0), - r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#, - ), - ( - line!(), - MetricsKey::resident_size(tenant_id).at(now, 0), - r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#, - ), - ( - line!(), - MetricsKey::synthetic_size(tenant_id).at(now, 1), - r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#, - ), - ]; - - let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0); - - for (line, (key, (kind, value)), expected) in examples { - let e = consumption_metrics::Event { - kind, - metric: key.metric, - idempotency_key: idempotency_key.to_string(), - value, - extra: Ids { - tenant_id: key.tenant_id, - timeline_id: key.timeline_id, - }, - }; - let actual = serde_json::to_string(&e).unwrap(); - assert_eq!(expected, actual, "example from line {line}"); - } -} - #[test] fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { // it can happen that we lose the inmemorylayer but have previously sent metrics and we @@ -220,7 +141,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { let mut cache = HashMap::from([ MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100), - MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until( way_before, before_restart, // not taken into account, but the timestamps are important @@ -234,7 +155,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { assert_eq!( metrics, &[ - MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until( before_restart, now, 0 @@ -252,8 +173,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { assert_eq!( metrics, &[ - MetricsKey::written_size_delta(tenant_id, timeline_id) - .from_previous_up_to(now, later, 0), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0), MetricsKey::written_size(tenant_id, timeline_id).at(later, 100), ] ); @@ -359,3 +279,19 @@ fn time_backwards() -> [std::time::SystemTime; N] { times } + +pub(crate) const fn metric_examples( + tenant_id: TenantId, + timeline_id: TimelineId, + now: DateTime, + before: DateTime, +) -> [RawMetric; 6] { + [ + MetricsKey::written_size(tenant_id, timeline_id).at(now, 0), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0), + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0), + MetricsKey::remote_storage_size(tenant_id).at(now, 0), + MetricsKey::resident_size(tenant_id).at(now, 0), + MetricsKey::synthetic_size(tenant_id).at(now, 1), + ] +} diff --git a/pageserver/src/consumption_metrics/upload.rs b/pageserver/src/consumption_metrics/upload.rs index 6f6b263884..d69d43a2a8 100644 --- a/pageserver/src/consumption_metrics/upload.rs +++ b/pageserver/src/consumption_metrics/upload.rs @@ -1,8 +1,21 @@ -use consumption_metrics::{idempotency_key, Event, EventChunk, CHUNK_SIZE}; +use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE}; +use serde_with::serde_as; use tokio_util::sync::CancellationToken; -use tracing::*; +use tracing::Instrument; -use super::{Cache, Ids, RawMetric}; +use super::{metrics::Name, Cache, MetricsKey, RawMetric}; +use utils::id::{TenantId, TimelineId}; + +/// How the metrics from pageserver are identified. +#[serde_with::serde_as] +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)] +struct Ids { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub(super) tenant_id: TenantId, + #[serde_as(as = "Option")] + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) timeline_id: Option, +} #[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))] pub(super) async fn upload_metrics( @@ -13,44 +26,21 @@ pub(super) async fn upload_metrics( metrics: &[RawMetric], cached_metrics: &mut Cache, ) -> anyhow::Result<()> { - use bytes::BufMut; - let mut uploaded = 0; let mut failed = 0; let started_at = std::time::Instant::now(); - // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries - let mut buffer = bytes::BytesMut::new(); - let mut chunk_to_send = Vec::new(); + let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id); - for chunk in metrics.chunks(CHUNK_SIZE) { - chunk_to_send.clear(); + while let Some(res) = iter.next() { + let (chunk, body) = res?; - // FIXME: this should always overwrite and truncate to chunk.len() - chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event { - kind: *when, - metric: curr_key.metric, - // FIXME: finally write! this to the prev allocation - idempotency_key: idempotency_key(node_id), - value: *curr_val, - extra: Ids { - tenant_id: curr_key.tenant_id, - timeline_id: curr_key.timeline_id, - }, - })); - - serde_json::to_writer( - (&mut buffer).writer(), - &EventChunk { - events: (&chunk_to_send).into(), - }, - )?; - - let body = buffer.split().freeze(); let event_bytes = body.len(); - let res = upload(client, metric_collection_endpoint, body, cancel) + let is_last = iter.len() == 0; + + let res = upload(client, metric_collection_endpoint, body, cancel, is_last) .instrument(tracing::info_span!( "upload", %event_bytes, @@ -88,6 +78,150 @@ pub(super) async fn upload_metrics( Ok(()) } +// The return type is quite ugly, but we gain testability in isolation +fn serialize_in_chunks<'a, F>( + chunk_size: usize, + input: &'a [RawMetric], + factory: F, +) -> impl ExactSizeIterator> + 'a +where + F: KeyGen<'a> + 'a, +{ + use bytes::BufMut; + + struct Iter<'a, F> { + inner: std::slice::Chunks<'a, RawMetric>, + chunk_size: usize, + + // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries + buffer: bytes::BytesMut, + // chunk amount of events are reused to produce the serialized document + scratch: Vec>, + factory: F, + } + + impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> { + type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>; + + fn next(&mut self) -> Option { + let chunk = self.inner.next()?; + + if self.scratch.is_empty() { + // first round: create events with N strings + self.scratch.extend( + chunk + .iter() + .map(|raw_metric| raw_metric.as_event(&self.factory.generate())), + ); + } else { + // next rounds: update_in_place to reuse allocations + assert_eq!(self.scratch.len(), self.chunk_size); + self.scratch + .iter_mut() + .zip(chunk.iter()) + .for_each(|(slot, raw_metric)| { + raw_metric.update_in_place(slot, &self.factory.generate()) + }); + } + + let res = serde_json::to_writer( + (&mut self.buffer).writer(), + &EventChunk { + events: (&self.scratch[..chunk.len()]).into(), + }, + ); + + match res { + Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))), + Err(e) => Some(Err(e)), + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } + } + + impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {} + + let buffer = bytes::BytesMut::new(); + let inner = input.chunks(chunk_size); + let scratch = Vec::new(); + + Iter { + inner, + chunk_size, + buffer, + scratch, + factory, + } +} + +trait RawMetricExt { + fn as_event(&self, key: &IdempotencyKey<'_>) -> Event; + fn update_in_place(&self, event: &mut Event, key: &IdempotencyKey<'_>); +} + +impl RawMetricExt for RawMetric { + fn as_event(&self, key: &IdempotencyKey<'_>) -> Event { + let MetricsKey { + metric, + tenant_id, + timeline_id, + } = self.0; + + let (kind, value) = self.1; + + Event { + kind, + metric, + idempotency_key: key.to_string(), + value, + extra: Ids { + tenant_id, + timeline_id, + }, + } + } + + fn update_in_place(&self, event: &mut Event, key: &IdempotencyKey<'_>) { + use std::fmt::Write; + + let MetricsKey { + metric, + tenant_id, + timeline_id, + } = self.0; + + let (kind, value) = self.1; + + *event = Event { + kind, + metric, + idempotency_key: { + event.idempotency_key.clear(); + write!(event.idempotency_key, "{key}").unwrap(); + std::mem::take(&mut event.idempotency_key) + }, + value, + extra: Ids { + tenant_id, + timeline_id, + }, + }; + } +} + +trait KeyGen<'a>: Copy { + fn generate(&self) -> IdempotencyKey<'a>; +} + +impl<'a> KeyGen<'a> for &'a str { + fn generate(&self) -> IdempotencyKey<'a> { + IdempotencyKey::generate(self) + } +} + enum UploadError { Rejected(reqwest::StatusCode), Reqwest(reqwest::Error), @@ -119,11 +253,16 @@ impl UploadError { } } +// this is consumed by the test verifiers +static LAST_IN_BATCH: reqwest::header::HeaderName = + reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch"); + async fn upload( client: &reqwest::Client, metric_collection_endpoint: &reqwest::Url, body: bytes::Bytes, cancel: &CancellationToken, + is_last: bool, ) -> Result<(), UploadError> { let warn_after = 3; let max_attempts = 10; @@ -134,17 +273,24 @@ async fn upload( let res = client .post(metric_collection_endpoint.clone()) .header(reqwest::header::CONTENT_TYPE, "application/json") + .header( + LAST_IN_BATCH.clone(), + if is_last { "true" } else { "false" }, + ) .body(body) .send() .await; let res = res.and_then(|res| res.error_for_status()); + // 10 redirects are normally allowed, so we don't need worry about 3xx match res { Ok(_response) => Ok(()), Err(e) => { let status = e.status().filter(|s| s.is_client_error()); if let Some(status) = status { + // rejection used to be a thing when the server could reject a + // whole batch of metrics if one metric was bad. Err(UploadError::Rejected(status)) } else { Err(UploadError::Reqwest(e)) @@ -175,3 +321,123 @@ async fn upload( res } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{DateTime, Utc}; + use once_cell::sync::Lazy; + + #[test] + fn chunked_serialization() { + let examples = metric_samples(); + assert!(examples.len() > 1); + + let factory = FixedGen::new(Utc::now(), "1", 42); + + // need to use Event here because serde_json::Value uses default hashmap, not linked + // hashmap + #[derive(serde::Deserialize)] + struct EventChunk { + events: Vec>, + } + + let correct = serialize_in_chunks(examples.len(), &examples, factory) + .map(|res| res.unwrap().1) + .flat_map(|body| serde_json::from_slice::(&body).unwrap().events) + .collect::>(); + + for chunk_size in 1..examples.len() { + let actual = serialize_in_chunks(chunk_size, &examples, factory) + .map(|res| res.unwrap().1) + .flat_map(|body| serde_json::from_slice::(&body).unwrap().events) + .collect::>(); + + // if these are equal, it means that multi-chunking version works as well + assert_eq!(correct, actual); + } + } + + #[derive(Clone, Copy)] + struct FixedGen<'a>(chrono::DateTime, &'a str, u16); + + impl<'a> FixedGen<'a> { + fn new(now: chrono::DateTime, node_id: &'a str, nonce: u16) -> Self { + FixedGen(now, node_id, nonce) + } + } + + impl<'a> KeyGen<'a> for FixedGen<'a> { + fn generate(&self) -> IdempotencyKey<'a> { + IdempotencyKey::for_tests(self.0, self.1, self.2) + } + } + + static SAMPLES_NOW: Lazy> = Lazy::new(|| { + DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z") + .unwrap() + .into() + }); + + #[test] + fn metric_image_stability() { + // it is important that these strings stay as they are + + let examples = [ + ( + line!(), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, + ), + ( + line!(), + r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, + ), + ( + line!(), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, + ), + ( + line!(), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#, + ), + ( + line!(), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#, + ), + ( + line!(), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#, + ), + ]; + + let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0); + let examples = examples.into_iter().zip(metric_samples()); + + for ((line, expected), (key, (kind, value))) in examples { + let e = consumption_metrics::Event { + kind, + metric: key.metric, + idempotency_key: idempotency_key.to_string(), + value, + extra: Ids { + tenant_id: key.tenant_id, + timeline_id: key.timeline_id, + }, + }; + let actual = serde_json::to_string(&e).unwrap(); + assert_eq!(expected, actual, "example for {kind:?} from line {line}"); + } + } + + fn metric_samples() -> [RawMetric; 6] { + let tenant_id = TenantId::from_array([0; 16]); + let timeline_id = TimelineId::from_array([0xff; 16]); + + let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z") + .unwrap() + .into(); + let [now, before] = [*SAMPLES_NOW, before]; + + super::super::metrics::metric_examples(tenant_id, timeline_id, now, before) + } +} diff --git a/test_runner/regress/test_pageserver_metric_collection.py b/test_runner/regress/test_pageserver_metric_collection.py index eb9b295479..dae39d2752 100644 --- a/test_runner/regress/test_pageserver_metric_collection.py +++ b/test_runner/regress/test_pageserver_metric_collection.py @@ -1,5 +1,7 @@ import json import time +from dataclasses import dataclass +from pathlib import Path from queue import SimpleQueue from typing import Any, Dict, Set @@ -28,6 +30,7 @@ def test_metric_collection( (host, port) = httpserver_listen_address metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + # this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose uploads: SimpleQueue[Any] = SimpleQueue() def metrics_handler(request: Request) -> Response: @@ -35,7 +38,9 @@ def test_metric_collection( return Response(status=400) events = request.json["events"] - uploads.put(events) + is_last = request.headers["pageserver-metrics-last-upload-in-batch"] + assert is_last in ["true", "false"] + uploads.put((events, is_last == "true")) return Response(status=200) # Require collecting metrics frequently, since we change @@ -43,15 +48,12 @@ def test_metric_collection( # # Disable time-based pitr, we will use the manual GC calls # to trigger remote storage operations in a controlled way - neon_env_builder.pageserver_config_override = ( - f""" + neon_env_builder.pageserver_config_override = f""" metric_collection_interval="1s" metric_collection_endpoint="{metric_collection_endpoint}" cached_metric_collection_interval="0s" synthetic_size_calculation_interval="3s" - """ - + "tenant_config={pitr_interval = '0 sec'}" - ) + """ neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) @@ -63,7 +65,7 @@ def test_metric_collection( ) # spin up neon, after http server is ready - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) # httpserver is shut down before pageserver during passing run env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") tenant_id = env.initial_tenant @@ -124,19 +126,20 @@ def test_metric_collection( events = uploads.get(timeout=timeout) if events == "ready": - events = uploads.get(timeout=timeout) - v.ingest(events) + (events, is_last) = uploads.get(timeout=timeout) + v.ingest(events, is_last) break else: - v.ingest(events) + (events, is_last) = events + v.ingest(events, is_last) if "synthetic_storage_size" not in v.accepted_event_names(): log.info("waiting for synthetic storage size to be calculated and uploaded...") rounds = 0 while "synthetic_storage_size" not in v.accepted_event_names(): - events = uploads.get(timeout=timeout) - v.ingest(events) + (events, is_last) = uploads.get(timeout=timeout) + v.ingest(events, is_last) rounds += 1 assert rounds < 10, "did not get synthetic_storage_size in 10 uploads" # once we have it in verifiers, it will assert that future batches will contain it @@ -150,17 +153,161 @@ def test_metric_collection( events = uploads.get(timeout=timeout) if events == "ready": - events = uploads.get(timeout=timeout * 3) - v.ingest(events) - events = uploads.get(timeout=timeout) - v.ingest(events) + (events, is_last) = uploads.get(timeout=timeout * 3) + v.ingest(events, is_last) + (events, is_last) = uploads.get(timeout=timeout) + v.ingest(events, is_last) break else: - v.ingest(events) + (events, is_last) = events + v.ingest(events, is_last) httpserver.check() +def test_metric_collection_cleans_up_tempfile( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address, +): + (host, port) = httpserver_listen_address + metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + + # this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose + uploads: SimpleQueue[Any] = SimpleQueue() + + def metrics_handler(request: Request) -> Response: + if request.json is None: + return Response(status=400) + + events = request.json["events"] + is_last = request.headers["pageserver-metrics-last-upload-in-batch"] + assert is_last in ["true", "false"] + uploads.put((events, is_last == "true")) + return Response(status=200) + + # Require collecting metrics frequently, since we change + # the timeline and want something to be logged about it. + # + # Disable time-based pitr, we will use the manual GC calls + # to trigger remote storage operations in a controlled way + neon_env_builder.pageserver_config_override = f""" + metric_collection_interval="1s" + metric_collection_endpoint="{metric_collection_endpoint}" + cached_metric_collection_interval="0s" + synthetic_size_calculation_interval="3s" + """ + + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + # mock http server that returns OK for the metrics + httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( + metrics_handler + ) + + # spin up neon, after http server is ready + env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) + pageserver_http = env.pageserver.http_client() + + # httpserver is shut down before pageserver during passing run + env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("CREATE TABLE foo (id int, counter int, t text)") + cur.execute( + """ + INSERT INTO foo + SELECT g, 0, 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + # we expect uploads at 1Hz, on busy runners this could be too optimistic, + # so give 5s we only want to get the following upload after "ready" value. + timeout = 5 + + # these strings in the upload queue allow synchronizing with the uploads + # and the main test execution + uploads.put("ready") + + while True: + events = uploads.get(timeout=timeout) + + if events == "ready": + (events, _) = uploads.get(timeout=timeout) + break + + # should really configure an env? + pageserver_http.configure_failpoints(("before-persist-last-metrics-collected", "exit")) + + time.sleep(3) + + env.pageserver.stop() + + initially = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json") + + assert ( + len(initially.matching) == 2 + ), f"expecting actual file and tempfile, but not found: {initially.matching}" + + uploads.put("ready") + env.pageserver.start() + + while True: + events = uploads.get(timeout=timeout * 3) + + if events == "ready": + (events, _) = uploads.get(timeout=timeout) + break + + env.pageserver.stop() + + later = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json") + + # it is possible we shutdown the pageserver right at the correct time, so the old tempfile + # is gone, but we also have a new one. + only = set(["last_consumption_metrics.json"]) + assert ( + initially.matching.intersection(later.matching) == only + ), "only initial tempfile should had been removed" + assert initially.other.issuperset(later.other), "no other files should had been removed" + + +@dataclass +class PrefixPartitionedFiles: + matching: Set[str] + other: Set[str] + + +def iterate_pageserver_workdir(path: Path, prefix: str) -> PrefixPartitionedFiles: + """ + Iterates the files in the workdir, returns two sets: + - files with the prefix + - files without the prefix + """ + + matching = set() + other = set() + for entry in path.iterdir(): + if not entry.is_file(): + continue + + if not entry.name.startswith(prefix): + other.add(entry.name) + else: + matching.add(entry.name) + + return PrefixPartitionedFiles(matching, other) + + class MetricsVerifier: """ A graph of per tenant per timeline verifiers, allowing one for each @@ -171,7 +318,7 @@ class MetricsVerifier: self.tenants: Dict[TenantId, TenantMetricsVerifier] = {} pass - def ingest(self, events): + def ingest(self, events, is_last): stringified = json.dumps(events, indent=2) log.info(f"ingesting: {stringified}") for event in events: @@ -181,8 +328,9 @@ class MetricsVerifier: self.tenants[id].ingest(event) - for t in self.tenants.values(): - t.post_batch() + if is_last: + for t in self.tenants.values(): + t.post_batch() def accepted_event_names(self) -> Set[str]: names: Set[str] = set()