diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 0c7630edca..7e8c00c293 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -14,6 +14,7 @@ use itertools::Itertools as _; use pageserver_api::models::TenantState; use remote_storage::{GenericRemoteStorage, RemoteStorageConfig}; use reqwest::Url; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -35,12 +36,62 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); /// upload attempts. type RawMetric = (MetricsKey, (EventType, u64)); +/// The new serializable metrics format +#[derive(Serialize, Deserialize)] +struct NewMetricsRoot { + version: usize, + metrics: Vec, +} + +impl NewMetricsRoot { + pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool { + if let Some(ver) = json_value.get("version") { + if let Some(2) = ver.as_u64() { + return true; + } + } + false + } +} + +/// The new serializable metrics format +#[derive(Serialize)] +struct NewMetricsRefRoot<'a> { + version: usize, + metrics: &'a [NewRawMetric], +} + +impl<'a> NewMetricsRefRoot<'a> { + fn new(metrics: &'a [NewRawMetric]) -> Self { + Self { + version: 2, + metrics, + } + } +} + +/// The new serializable metrics format +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +struct NewRawMetric { + key: MetricsKey, + kind: EventType, + value: u64, + // TODO: add generation field and check against generations +} + +impl NewRawMetric { + #[cfg(test)] + fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) { + (self.key, self.clone()) + } +} + /// Caches the [`RawMetric`]s /// /// In practice, during startup, last sent values are stored here to be used in calculating new /// ones. After successful uploading, the cached values are updated to cache. This used to be used /// for deduplication, but that is no longer needed. -type Cache = HashMap; +type Cache = HashMap; pub async fn run( conf: &'static PageServerConf, @@ -231,11 +282,14 @@ async fn restore_and_reschedule( // collect_all_metrics let earlier_metric_at = found_some .iter() - .map(|(_, (et, _))| et.recorded_at()) + .map(|item| item.kind.recorded_at()) .copied() .next(); - let cached = found_some.into_iter().collect::(); + let cached = found_some + .into_iter() + .map(|item| (item.key, item)) + .collect::(); (cached, earlier_metric_at) } diff --git a/pageserver/src/consumption_metrics/disk_cache.rs b/pageserver/src/consumption_metrics/disk_cache.rs index 387bf7a0f9..54a505a134 100644 --- a/pageserver/src/consumption_metrics/disk_cache.rs +++ b/pageserver/src/consumption_metrics/disk_cache.rs @@ -2,11 +2,33 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use std::sync::Arc; -use super::RawMetric; +use crate::consumption_metrics::NewMetricsRefRoot; + +use super::{NewMetricsRoot, NewRawMetric, RawMetric}; + +pub(super) fn read_metrics_from_serde_value( + json_value: serde_json::Value, +) -> anyhow::Result> { + if NewMetricsRoot::is_v2_metrics(&json_value) { + let root = serde_json::from_value::(json_value)?; + Ok(root.metrics) + } else { + let all_metrics = serde_json::from_value::>(json_value)?; + let all_metrics = all_metrics + .into_iter() + .map(|(key, (event_type, value))| NewRawMetric { + key, + kind: event_type, + value, + }) + .collect(); + Ok(all_metrics) + } +} pub(super) async fn read_metrics_from_disk( path: Arc, -) -> anyhow::Result> { +) -> anyhow::Result> { // do not add context to each error, callsite will log with full path let span = tracing::Span::current(); tokio::task::spawn_blocking(move || { @@ -20,7 +42,8 @@ pub(super) async fn read_metrics_from_disk( let mut file = std::fs::File::open(&*path)?; let reader = std::io::BufReader::new(&mut file); - anyhow::Ok(serde_json::from_reader::<_, Vec>(reader)?) + let json_value = serde_json::from_reader::<_, serde_json::Value>(reader)?; + read_metrics_from_serde_value(json_value) }) .await .context("read metrics join error") @@ -63,7 +86,7 @@ fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> { } pub(super) async fn flush_metrics_to_disk( - current_metrics: &Arc>, + current_metrics: &Arc>, path: &Arc, ) -> anyhow::Result<()> { use std::io::Write; @@ -93,8 +116,11 @@ pub(super) async fn flush_metrics_to_disk( // write out all of the raw metrics, to be read out later on restart as cached values { let mut writer = std::io::BufWriter::new(&mut tempfile); - serde_json::to_writer(&mut writer, &*current_metrics) - .context("serialize metrics")?; + serde_json::to_writer( + &mut writer, + &NewMetricsRefRoot::new(current_metrics.as_ref()), + ) + .context("serialize metrics")?; writer .into_inner() .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?; diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 7ba2d04c4f..07fac09f6f 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -9,7 +9,7 @@ use utils::{ lsn::Lsn, }; -use super::{Cache, RawMetric}; +use super::{Cache, NewRawMetric}; /// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events` /// instead of static str. @@ -64,11 +64,21 @@ impl MetricsKey { struct AbsoluteValueFactory(MetricsKey); impl AbsoluteValueFactory { - const fn at(self, time: DateTime, val: u64) -> RawMetric { + #[cfg(test)] + const fn at_old_format(self, time: DateTime, val: u64) -> super::RawMetric { let key = self.0; (key, (EventType::Absolute { time }, val)) } + const fn at(self, time: DateTime, val: u64) -> NewRawMetric { + let key = self.0; + NewRawMetric { + key, + kind: EventType::Absolute { time }, + value: val, + } + } + fn key(&self) -> &MetricsKey { &self.0 } @@ -84,7 +94,28 @@ impl IncrementalValueFactory { prev_end: DateTime, up_to: DateTime, val: u64, - ) -> RawMetric { + ) -> NewRawMetric { + let key = self.0; + // cannot assert prev_end < up_to because these are realtime clock based + let when = EventType::Incremental { + start_time: prev_end, + stop_time: up_to, + }; + NewRawMetric { + key, + kind: when, + value: val, + } + } + + #[allow(clippy::wrong_self_convention)] + #[cfg(test)] + const fn from_until_old_format( + self, + prev_end: DateTime, + up_to: DateTime, + val: u64, + ) -> super::RawMetric { let key = self.0; // cannot assert prev_end < up_to because these are realtime clock based let when = EventType::Incremental { @@ -185,7 +216,7 @@ pub(super) async fn collect_all_metrics( tenant_manager: &Arc, cached_metrics: &Cache, ctx: &RequestContext, -) -> Vec { +) -> Vec { use pageserver_api::models::TenantState; let started_at = std::time::Instant::now(); @@ -220,11 +251,11 @@ pub(super) async fn collect_all_metrics( res } -async fn collect(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec +async fn collect(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec where S: futures::stream::Stream)>, { - let mut current_metrics: Vec = Vec::new(); + let mut current_metrics: Vec = Vec::new(); let mut tenants = std::pin::pin!(tenants); @@ -291,7 +322,7 @@ impl TenantSnapshot { tenant_id: TenantId, now: DateTime, cached: &Cache, - metrics: &mut Vec, + metrics: &mut Vec, ) { let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size); @@ -302,9 +333,9 @@ impl TenantSnapshot { let mut synthetic_size = self.synthetic_size; if synthetic_size == 0 { - if let Some((_, value)) = cached.get(factory.key()) { - // use the latest value from previous session - synthetic_size = *value; + if let Some(item) = cached.get(factory.key()) { + // use the latest value from previous session, TODO: check generation number + synthetic_size = item.value; } } @@ -381,37 +412,36 @@ impl TimelineSnapshot { tenant_id: TenantId, timeline_id: TimelineId, now: DateTime, - metrics: &mut Vec, + metrics: &mut Vec, cache: &Cache, ) { let timeline_written_size = u64::from(self.last_record_lsn); let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id); - let last_stop_time = cache - .get(written_size_delta_key.key()) - .map(|(until, _val)| { - until - .incremental_timerange() - .expect("never create EventType::Absolute for written_size_delta") - .end - }); + let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| { + item.kind + .incremental_timerange() + .expect("never create EventType::Absolute for written_size_delta") + .end + }); - let (key, written_size_now) = + let written_size_now = MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size); // by default, use the last sent written_size as the basis for // calculating the delta. if we don't yet have one, use the load time value. - let prev = cache - .get(&key) - .map(|(prev_at, prev)| { + let prev: (DateTime, u64) = cache + .get(&written_size_now.key) + .map(|item| { // use the prev time from our last incremental update, or default to latest // absolute update on the first round. - let prev_at = prev_at + let prev_at = item + .kind .absolute_time() .expect("never create EventType::Incremental for written_size"); let prev_at = last_stop_time.unwrap_or(prev_at); - (*prev_at, *prev) + (*prev_at, item.value) }) .unwrap_or_else(|| { // if we don't have a previous point of comparison, compare to the load time @@ -422,24 +452,28 @@ impl TimelineSnapshot { let up_to = now; - if let Some(delta) = written_size_now.1.checked_sub(prev.1) { + if let Some(delta) = written_size_now.value.checked_sub(prev.1) { let key_value = written_size_delta_key.from_until(prev.0, up_to, delta); // written_size_delta metrics.push(key_value); // written_size - metrics.push((key, written_size_now)); + metrics.push(written_size_now); } else { // the cached value was ahead of us, report zero until we've caught up metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0)); // the cached value was ahead of us, report the same until we've caught up - metrics.push((key, (written_size_now.0, prev.1))); + metrics.push(NewRawMetric { + key: written_size_now.key, + kind: written_size_now.kind, + value: prev.1, + }); } { let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id); let current_or_previous = self .current_exact_logical_size - .or_else(|| cache.get(factory.key()).map(|(_, val)| *val)); + .or_else(|| cache.get(factory.key()).map(|item| item.value)); if let Some(size) = current_or_previous { metrics.push(factory.at(now, size)); @@ -452,4 +486,4 @@ impl TimelineSnapshot { mod tests; #[cfg(test)] -pub(crate) use tests::metric_examples; +pub(crate) use tests::{metric_examples, metric_examples_old}; diff --git a/pageserver/src/consumption_metrics/metrics/tests.rs b/pageserver/src/consumption_metrics/metrics/tests.rs index f9cbcea565..3ed7b44123 100644 --- a/pageserver/src/consumption_metrics/metrics/tests.rs +++ b/pageserver/src/consumption_metrics/metrics/tests.rs @@ -1,3 +1,5 @@ +use crate::consumption_metrics::RawMetric; + use super::*; use std::collections::HashMap; @@ -50,9 +52,9 @@ fn startup_collected_timeline_metrics_second_round() { let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2); let mut metrics = Vec::new(); - let cache = HashMap::from([ - MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0) - ]); + let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id) + .at(before, disk_consistent_lsn.0) + .to_kv_pair()]); let snap = TimelineSnapshot { loaded_at: (disk_consistent_lsn, init), @@ -89,9 +91,13 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() { let mut metrics = Vec::new(); let cache = HashMap::from([ // at t=before was the last time the last_record_lsn changed - MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0), + MetricsKey::written_size(tenant_id, timeline_id) + .at(before, disk_consistent_lsn.0) + .to_kv_pair(), // end time of this event is used for the next ones - MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0), + MetricsKey::written_size_delta(tenant_id, timeline_id) + .from_until(before, just_before, 0) + .to_kv_pair(), ]); let snap = TimelineSnapshot { @@ -138,13 +144,17 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { }; let mut cache = HashMap::from([ - MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100), - MetricsKey::written_size_delta(tenant_id, timeline_id).from_until( - way_before, - before_restart, - // not taken into account, but the timestamps are important - 999_999_999, - ), + MetricsKey::written_size(tenant_id, timeline_id) + .at(before_restart, 100) + .to_kv_pair(), + MetricsKey::written_size_delta(tenant_id, timeline_id) + .from_until( + way_before, + before_restart, + // not taken into account, but the timestamps are important + 999_999_999, + ) + .to_kv_pair(), ]); let mut metrics = Vec::new(); @@ -163,7 +173,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { ); // now if we cache these metrics, and re-run while "still in recovery" - cache.extend(metrics.drain(..)); + cache.extend(metrics.drain(..).map(|x| x.to_kv_pair())); // "still in recovery", because our snapshot did not change snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache); @@ -194,14 +204,14 @@ fn post_restart_current_exact_logical_size_uses_cached() { current_exact_logical_size: None, }; - let cache = HashMap::from([ - MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100) - ]); + let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id) + .at(before_restart, 100) + .to_kv_pair()]); let mut metrics = Vec::new(); snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache); - metrics.retain(|(key, _)| key.metric == Name::LogicalSize); + metrics.retain(|item| item.key.metric == Name::LogicalSize); assert_eq!( metrics, @@ -224,7 +234,9 @@ fn post_restart_synthetic_size_uses_cached_if_available() { let before_restart = DateTime::::from(now - std::time::Duration::from_secs(5 * 60)); let now = DateTime::::from(now); - let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]); + let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id) + .at(before_restart, 1000) + .to_kv_pair()]); let mut metrics = Vec::new(); ts.to_metrics(tenant_id, now, &cached, &mut metrics); @@ -278,12 +290,29 @@ fn time_backwards() -> [std::time::SystemTime; N] { times } -pub(crate) const fn metric_examples( +pub(crate) const fn metric_examples_old( tenant_id: TenantId, timeline_id: TimelineId, now: DateTime, before: DateTime, ) -> [RawMetric; 6] { + [ + MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0), + MetricsKey::written_size_delta(tenant_id, timeline_id) + .from_until_old_format(before, now, 0), + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0), + MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0), + MetricsKey::resident_size(tenant_id).at_old_format(now, 0), + MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1), + ] +} + +pub(crate) const fn metric_examples( + tenant_id: TenantId, + timeline_id: TimelineId, + now: DateTime, + before: DateTime, +) -> [NewRawMetric; 6] { [ MetricsKey::written_size(tenant_id, timeline_id).at(now, 0), MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0), diff --git a/pageserver/src/consumption_metrics/upload.rs b/pageserver/src/consumption_metrics/upload.rs index 1eb25d337b..1cb4e917c0 100644 --- a/pageserver/src/consumption_metrics/upload.rs +++ b/pageserver/src/consumption_metrics/upload.rs @@ -7,7 +7,7 @@ use tokio::io::AsyncWriteExt; use tokio_util::sync::CancellationToken; use tracing::Instrument; -use super::{metrics::Name, Cache, MetricsKey, RawMetric}; +use super::{metrics::Name, Cache, MetricsKey, NewRawMetric, RawMetric}; use utils::id::{TenantId, TimelineId}; /// How the metrics from pageserver are identified. @@ -24,7 +24,7 @@ pub(super) async fn upload_metrics_http( client: &reqwest::Client, metric_collection_endpoint: &reqwest::Url, cancel: &CancellationToken, - metrics: &[RawMetric], + metrics: &[NewRawMetric], cached_metrics: &mut Cache, idempotency_keys: &[IdempotencyKey<'_>], ) -> anyhow::Result<()> { @@ -53,8 +53,8 @@ pub(super) async fn upload_metrics_http( match res { Ok(()) => { - for (curr_key, curr_val) in chunk { - cached_metrics.insert(*curr_key, *curr_val); + for item in chunk { + cached_metrics.insert(item.key, item.clone()); } uploaded += chunk.len(); } @@ -86,7 +86,7 @@ pub(super) async fn upload_metrics_bucket( client: &GenericRemoteStorage, cancel: &CancellationToken, node_id: &str, - metrics: &[RawMetric], + metrics: &[NewRawMetric], idempotency_keys: &[IdempotencyKey<'_>], ) -> anyhow::Result<()> { if metrics.is_empty() { @@ -140,16 +140,16 @@ pub(super) async fn upload_metrics_bucket( /// across different metrics sinks), and must have the same length as input. fn serialize_in_chunks<'a>( chunk_size: usize, - input: &'a [RawMetric], + input: &'a [NewRawMetric], idempotency_keys: &'a [IdempotencyKey<'a>], -) -> impl ExactSizeIterator> + 'a +) -> impl ExactSizeIterator> + 'a { use bytes::BufMut; assert_eq!(input.len(), idempotency_keys.len()); struct Iter<'a> { - inner: std::slice::Chunks<'a, RawMetric>, + inner: std::slice::Chunks<'a, NewRawMetric>, idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>, chunk_size: usize, @@ -160,7 +160,7 @@ fn serialize_in_chunks<'a>( } impl<'a> Iterator for Iter<'a> { - type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>; + type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>; fn next(&mut self) -> Option { let chunk = self.inner.next()?; @@ -269,6 +269,58 @@ impl RawMetricExt for RawMetric { } } +impl RawMetricExt for NewRawMetric { + fn as_event(&self, key: &IdempotencyKey<'_>) -> Event { + let MetricsKey { + metric, + tenant_id, + timeline_id, + } = self.key; + + let kind = self.kind; + let value = self.value; + + Event { + kind, + metric, + idempotency_key: key.to_string(), + value, + extra: Ids { + tenant_id, + timeline_id, + }, + } + } + + fn update_in_place(&self, event: &mut Event, key: &IdempotencyKey<'_>) { + use std::fmt::Write; + + let MetricsKey { + metric, + tenant_id, + timeline_id, + } = self.key; + + let kind = self.kind; + let value = self.value; + + *event = Event { + kind, + metric, + idempotency_key: { + event.idempotency_key.clear(); + write!(event.idempotency_key, "{key}").unwrap(); + std::mem::take(&mut event.idempotency_key) + }, + value, + extra: Ids { + tenant_id, + timeline_id, + }, + }; + } +} + pub(crate) trait KeyGen<'a> { fn generate(&self) -> IdempotencyKey<'a>; } @@ -381,6 +433,10 @@ async fn upload( #[cfg(test)] mod tests { + use crate::consumption_metrics::{ + disk_cache::read_metrics_from_serde_value, NewMetricsRefRoot, + }; + use super::*; use chrono::{DateTime, Utc}; use once_cell::sync::Lazy; @@ -473,23 +529,49 @@ mod tests { let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0); let examples = examples.into_iter().zip(metric_samples()); - for ((line, expected), (key, (kind, value))) in examples { + for ((line, expected), item) in examples { let e = consumption_metrics::Event { - kind, - metric: key.metric, + kind: item.kind, + metric: item.key.metric, idempotency_key: idempotency_key.to_string(), - value, + value: item.value, extra: Ids { - tenant_id: key.tenant_id, - timeline_id: key.timeline_id, + tenant_id: item.key.tenant_id, + timeline_id: item.key.timeline_id, }, }; let actual = serde_json::to_string(&e).unwrap(); - assert_eq!(expected, actual, "example for {kind:?} from line {line}"); + assert_eq!( + expected, actual, + "example for {:?} from line {line}", + item.kind + ); } } - fn metric_samples() -> [RawMetric; 6] { + #[test] + fn disk_format_upgrade() { + let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap(); + let new_samples = + serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap(); + let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap(); + let new_samples = read_metrics_from_serde_value(new_samples).unwrap(); + assert_eq!(upgraded_samples, new_samples); + } + + fn metric_samples_old() -> [RawMetric; 6] { + let tenant_id = TenantId::from_array([0; 16]); + let timeline_id = TimelineId::from_array([0xff; 16]); + + let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z") + .unwrap() + .into(); + let [now, before] = [*SAMPLES_NOW, before]; + + super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before) + } + + fn metric_samples() -> [NewRawMetric; 6] { let tenant_id = TenantId::from_array([0; 16]); let timeline_id = TimelineId::from_array([0xff; 16]);