From f902777202d77396f09fdbff57ec8fd56e62f588 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Sat, 16 Sep 2023 11:24:42 +0300 Subject: [PATCH] fix: consumption metrics on restart (#5323) Write collected metrics to disk to recover previously sent metrics on restart. Recover the previously collected metrics during startup, send them over at right time - send cached synthetic size before actual is calculated - when `last_record_lsn` rolls back on startup - stay at last sent `written_size` metric - send `written_size_delta_bytes` metric as 0 Add test support: stateful verification of events in python tests. Fixes: #5206 Cc: #5175 (loggings, will be enhanced in follow-up) --- libs/consumption_metrics/src/lib.rs | 2 +- pageserver/Cargo.toml | 2 +- pageserver/src/bin/pageserver.rs | 4 + pageserver/src/consumption_metrics.rs | 681 ++++++++++++++---- test_runner/regress/test_metric_collection.py | 257 +++++-- 5 files changed, 757 insertions(+), 189 deletions(-) diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index 9dd9b0a0ef..040b047d6f 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use rand::Rng; use serde::Serialize; -#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[serde(tag = "type")] pub enum EventType { #[serde(rename = "absolute")] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index bbdd8b1e99..9cb71dea09 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -80,11 +80,11 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +tempfile.workspace = true [dev-dependencies] criterion.workspace = true hex-literal.workspace = true -tempfile.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] } [[bench]] diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 6103f4c885..b6a2117f9c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -518,6 +518,9 @@ fn start_pageserver( // creates a child context with the right DownloadBehavior. DownloadBehavior::Error, ); + + let local_disk_storage = conf.workdir.join("last_consumption_metrics.json"); + task_mgr::spawn( crate::BACKGROUND_RUNTIME.handle(), TaskKind::MetricsCollection, @@ -544,6 +547,7 @@ fn start_pageserver( conf.cached_metric_collection_interval, conf.synthetic_size_calculation_interval, conf.id, + local_disk_storage, metrics_ctx, ) .instrument(info_span!("metrics_collection")) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index a4a68581fa..065f74045a 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -14,6 +14,7 @@ use reqwest::Url; use serde::Serialize; use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio_util::sync::CancellationToken; @@ -21,10 +22,12 @@ use tracing::*; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; +use anyhow::Context; + const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); #[serde_as] -#[derive(Serialize, Debug, Clone, Copy)] +#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)] struct Ids { #[serde_as(as = "DisplayFromStr")] tenant_id: TenantId, @@ -64,11 +67,17 @@ enum Name { /// /// This is a denormalization done at the MetricsKey const methods; these should not be constructed /// elsewhere. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[serde_with::serde_as] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] struct MetricsKey { + #[serde_as(as = "serde_with::DisplayFromStr")] tenant_id: TenantId, + + #[serde_as(as = "Option")] + #[serde(skip_serializing_if = "Option::is_none")] timeline_id: Option, - metric: &'static str, + + metric: Name, } impl MetricsKey { @@ -88,6 +97,10 @@ impl AbsoluteValueFactory { let key = self.0; (key, (EventType::Absolute { time }, val)) } + + fn key(&self) -> &MetricsKey { + &self.0 + } } /// Helper type which each individual metric kind can return to produce only incremental values. @@ -129,7 +142,7 @@ impl MetricsKey { MetricsKey { tenant_id, timeline_id: Some(timeline_id), - metric: "written_size", + metric: Name::WrittenSize, } .absolute_values() } @@ -144,9 +157,7 @@ impl MetricsKey { MetricsKey { tenant_id, timeline_id: Some(timeline_id), - // the name here is correctly about data not size, because that is what is wanted by - // downstream pipeline - metric: "written_data_bytes_delta", + metric: Name::WrittenSizeDelta, } .incremental_values() } @@ -161,7 +172,7 @@ impl MetricsKey { MetricsKey { tenant_id, timeline_id: Some(timeline_id), - metric: "timeline_logical_size", + metric: Name::LogicalSize, } .absolute_values() } @@ -173,7 +184,7 @@ impl MetricsKey { MetricsKey { tenant_id, timeline_id: None, - metric: "remote_storage_size", + metric: Name::RemoteSize, } .absolute_values() } @@ -185,7 +196,7 @@ impl MetricsKey { MetricsKey { tenant_id, timeline_id: None, - metric: "resident_size", + metric: Name::ResidentSize, } .absolute_values() } @@ -197,7 +208,7 @@ impl MetricsKey { MetricsKey { tenant_id, timeline_id: None, - metric: "synthetic_storage_size", + metric: Name::SyntheticSize, } .absolute_values() } @@ -224,6 +235,7 @@ pub async fn collect_metrics( _cached_metric_collection_interval: Duration, synthetic_size_calculation_interval: Duration, node_id: NodeId, + local_disk_storage: PathBuf, ctx: RequestContext, ) -> anyhow::Result<()> { if _cached_metric_collection_interval != Duration::ZERO { @@ -231,8 +243,6 @@ pub async fn collect_metrics( "cached_metric_collection_interval is no longer used, please set it to zero." ) } - let mut ticker = tokio::time::interval(metric_collection_interval); - info!("starting collect_metrics"); // spin up background worker that caclulates tenant sizes let worker_ctx = @@ -252,61 +262,183 @@ pub async fn collect_metrics( }, ); + let final_path: Arc = Arc::new(local_disk_storage); + // define client here to reuse it for all requests let client = reqwest::ClientBuilder::new() .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT) .build() .expect("Failed to create http client with timeout"); - let mut cached_metrics = HashMap::new(); - loop { - tokio::select! { - _ = task_mgr::shutdown_watcher() => { - info!("collect_metrics received cancellation request"); - return Ok(()); - }, - tick_at = ticker.tick() => { + let node_id = node_id.to_string(); + let cancel = task_mgr::shutdown_token(); - collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await; + let (mut cached_metrics, oldest_metric_captured_at) = + match read_metrics_from_disk(final_path.clone()).await { + Ok(found_some) => { + // there is no min needed because we write these sequentially in + // collect_all_metrics + let oldest_metric_captured_at = found_some + .iter() + .map(|(_, (et, _))| et.recorded_at()) + .copied() + .next(); - crate::tenant::tasks::warn_when_period_overrun( - tick_at.elapsed(), - metric_collection_interval, - "consumption_metrics_collect_metrics", + let cached = found_some + .into_iter() + .collect::>(); + + (cached, oldest_metric_captured_at) + } + Err(e) => { + let root = e.root_cause(); + + let maybe_ioerr = root.downcast_ref::(); + let is_not_found = + maybe_ioerr.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound); + + if !is_not_found { + tracing::info!( + "failed to read any previous metrics from {final_path:?}: {e:#}" + ); + } + + (HashMap::new(), None) + } + }; + + if let Some(oldest_metric_captured_at) = oldest_metric_captured_at { + // FIXME: chrono methods panic + let oldest_metric_captured_at: SystemTime = oldest_metric_captured_at.into(); + let now = SystemTime::now(); + let error = match now.duration_since(oldest_metric_captured_at) { + Ok(from_last_send) if from_last_send < metric_collection_interval => { + let sleep_for = metric_collection_interval - from_last_send; + + let deadline = std::time::Instant::now() + sleep_for; + + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = tokio::time::sleep_until(deadline.into()) => {}, + } + + let now = std::time::Instant::now(); + + // executor threads might be busy, add extra measurements + Some(if now < deadline { + deadline - now + } else { + now - deadline + }) + } + Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)), + Err(_) => { + tracing::warn!( + ?now, + ?oldest_metric_captured_at, + "oldest recorded metric is in future; first values will come out with inconsistent timestamps" ); + oldest_metric_captured_at.duration_since(now).ok() + } + }; + + if let Some(error) = error { + if error.as_secs() >= 60 { + tracing::info!( + error_ms = error.as_millis(), + "startup scheduling error due to restart" + ) } } } + + // reminder: ticker is ready immediatedly + let mut ticker = tokio::time::interval(metric_collection_interval); + + loop { + let tick_at = tokio::select! { + _ = cancel.cancelled() => return Ok(()), + tick_at = ticker.tick() => tick_at, + }; + + iteration( + &client, + metric_collection_endpoint, + &cancel, + &mut cached_metrics, + &node_id, + &final_path, + &ctx, + ) + .await; + + crate::tenant::tasks::warn_when_period_overrun( + tick_at.elapsed(), + metric_collection_interval, + "consumption_metrics_collect_metrics", + ); + } } -/// One iteration of metrics collection -/// -/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`. -/// Cache metrics to avoid sending the same metrics multiple times. -/// -/// This function handles all errors internally -/// and doesn't break iteration if just one tenant fails. -/// -/// TODO -/// - refactor this function (chunking+sending part) to reuse it in proxy module; -async fn collect_metrics_iteration( +async fn iteration( client: &reqwest::Client, - cached_metrics: &mut Cache, metric_collection_endpoint: &reqwest::Url, - node_id: NodeId, + cancel: &CancellationToken, + cached_metrics: &mut Cache, + node_id: &str, + final_path: &Arc, ctx: &RequestContext, ) { - trace!( - "starting collect_metrics_iteration. metric_collection_endpoint: {}", - metric_collection_endpoint - ); + // these are point in time, with variable "now" + let metrics = collect_all_metrics(cached_metrics, ctx).await; + + if metrics.is_empty() { + return; + } + + let metrics = Arc::new(metrics); + + let flush = async { + match flush_metrics_to_disk(&metrics, final_path).await { + Ok(()) => { + tracing::debug!("flushed metrics to disk"); + } + Err(e) => { + // idea here is that if someone creates a directory as our final_path, then they + // might notice it from the logs before shutdown and remove it + tracing::error!("failed to persist metrics to {final_path:?}: {e:#}"); + } + } + }; + + let upload = async { + let res = upload_metrics( + client, + metric_collection_endpoint, + cancel, + node_id, + &metrics, + cached_metrics, + ) + .await; + if let Err(e) = res { + // serialization error which should never happen + tracing::error!("failed to upload due to {e:#}"); + } + }; + + // let these run concurrently + let (_, _) = tokio::join!(flush, upload); +} + +async fn collect_all_metrics(cached_metrics: &Cache, ctx: &RequestContext) -> Vec { + let started_at = std::time::Instant::now(); - // get list of tenants let tenants = match mgr::list_tenants().await { Ok(tenants) => tenants, Err(err) => { error!("failed to list tenants: {:?}", err); - return; + return vec![]; } }; @@ -321,85 +453,29 @@ async fn collect_metrics_iteration( } }); - let current_metrics = collect(tenants, cached_metrics, ctx).await; + let res = collect(tenants, cached_metrics, ctx).await; - if current_metrics.is_empty() { - trace!("no new metrics to send"); - return; - } + tracing::info!( + elapsed_ms = started_at.elapsed().as_millis(), + total = res.len(), + "collected metrics" + ); - let cancel = task_mgr::shutdown_token(); - - // Send metrics. - // Split into chunks of 1000 metrics to avoid exceeding the max request size - let chunks = current_metrics.chunks(CHUNK_SIZE); - - let mut chunk_to_send: Vec> = Vec::with_capacity(CHUNK_SIZE); - - let node_id = node_id.to_string(); - - let mut buffer = bytes::BytesMut::new(); - - for chunk in chunks { - chunk_to_send.clear(); - - // enrich metrics with type,timestamp and idempotency key before sending - chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event { - kind: *when, - metric: curr_key.metric, - idempotency_key: idempotency_key(&node_id), - value: *curr_val, - extra: Ids { - tenant_id: curr_key.tenant_id, - timeline_id: curr_key.timeline_id, - }, - })); - - use bytes::BufMut; - - // FIXME: this is a new panic we did not have before. not really essential, but panics from - // this task currently restart pageserver. - serde_json::to_writer( - (&mut buffer).writer(), - &EventChunk { - events: (&chunk_to_send).into(), - }, - ) - .expect("serialization must not fail and bytesmut grows"); - - let body = buffer.split().freeze(); - - let res = upload(client, metric_collection_endpoint, body, &cancel).await; - - if res.is_ok() { - for (curr_key, curr_val) in chunk.iter() { - cached_metrics.insert(curr_key.clone(), *curr_val); - } - } else { - // no need to log, backoff::retry and upload have done it, just give up uploading. - } - } + res } -async fn collect( - tenants: S, - cache: &HashMap, - ctx: &RequestContext, -) -> Vec<(MetricsKey, (EventType, u64))> +async fn collect(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec where S: futures::stream::Stream)>, { - let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new(); + let mut current_metrics: Vec = Vec::new(); let mut tenants = std::pin::pin!(tenants); while let Some((tenant_id, tenant)) = tenants.next().await { let mut tenant_resident_size = 0; - // iterate through list of timelines in tenant for timeline in tenant.list_timelines() { - // collect per-timeline metrics only for active timelines - let timeline_id = timeline.timeline_id; match TimelineSnapshot::collect(&timeline, ctx) { @@ -425,16 +501,158 @@ where tenant_resident_size += timeline.resident_physical_size(); } - TenantSnapshot::collect(&tenant, tenant_resident_size).to_metrics( - tenant_id, - Utc::now(), - &mut current_metrics, - ); + let snap = TenantSnapshot::collect(&tenant, tenant_resident_size); + snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics); } current_metrics } +async fn flush_metrics_to_disk( + current_metrics: &Arc>, + final_path: &Arc, +) -> anyhow::Result<()> { + use std::io::Write; + + anyhow::ensure!( + final_path.parent().is_some(), + "path must have parent: {final_path:?}" + ); + + let span = tracing::Span::current(); + tokio::task::spawn_blocking({ + let current_metrics = current_metrics.clone(); + let final_path = final_path.clone(); + move || { + let _e = span.entered(); + + let mut tempfile = + tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?; + + // write out all of the raw metrics, to be read out later on restart as cached values + { + let mut writer = std::io::BufWriter::new(&mut tempfile); + serde_json::to_writer(&mut writer, &*current_metrics) + .context("serialize metrics")?; + writer + .into_inner() + .map_err(|_| anyhow::anyhow!("flushing metrics failed"))?; + } + + tempfile.flush()?; + tempfile.as_file().sync_all()?; + + drop(tempfile.persist(&*final_path)?); + + let f = std::fs::File::open(final_path.parent().unwrap())?; + f.sync_all()?; + + anyhow::Ok(()) + } + }) + .await + .with_context(|| format!("write metrics to {final_path:?} join error")) + .and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}"))) +} + +async fn read_metrics_from_disk(path: Arc) -> anyhow::Result> { + // do not add context to each error, callsite will log with full path + let span = tracing::Span::current(); + tokio::task::spawn_blocking(move || { + let _e = span.entered(); + let mut file = std::fs::File::open(&*path)?; + let reader = std::io::BufReader::new(&mut file); + anyhow::Ok(serde_json::from_reader::<_, Vec>(reader)?) + }) + .await + .context("read metrics join error") + .and_then(|x| x) +} + +#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))] +async fn upload_metrics( + client: &reqwest::Client, + metric_collection_endpoint: &reqwest::Url, + cancel: &CancellationToken, + node_id: &str, + metrics: &[RawMetric], + cached_metrics: &mut Cache, +) -> anyhow::Result<()> { + use bytes::BufMut; + + let mut uploaded = 0; + let mut failed = 0; + + let started_at = std::time::Instant::now(); + + // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries + let mut buffer = bytes::BytesMut::new(); + let mut chunk_to_send = Vec::new(); + + for chunk in metrics.chunks(CHUNK_SIZE) { + chunk_to_send.clear(); + + // FIXME: this should always overwrite and truncate to chunk.len() + chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event { + kind: *when, + metric: curr_key.metric, + // FIXME: finally write! this to the prev allocation + idempotency_key: idempotency_key(node_id), + value: *curr_val, + extra: Ids { + tenant_id: curr_key.tenant_id, + timeline_id: curr_key.timeline_id, + }, + })); + + serde_json::to_writer( + (&mut buffer).writer(), + &EventChunk { + events: (&chunk_to_send).into(), + }, + )?; + + let body = buffer.split().freeze(); + let event_bytes = body.len(); + + let res = upload(client, metric_collection_endpoint, body, cancel) + .instrument(tracing::info_span!( + "upload", + %event_bytes, + uploaded, + total = metrics.len(), + )) + .await; + + match res { + Ok(()) => { + for (curr_key, curr_val) in chunk { + cached_metrics.insert(*curr_key, *curr_val); + } + uploaded += chunk.len(); + } + Err(_) => { + // failure(s) have already been logged + // + // however this is an inconsistency: if we crash here, we will start with the + // values as uploaded. in practice, the rejections no longer happen. + failed += chunk.len(); + } + } + } + + let elapsed = started_at.elapsed(); + + tracing::info!( + uploaded, + failed, + elapsed_ms = elapsed.as_millis(), + "done sending metrics" + ); + + Ok(()) +} + enum UploadError { Rejected(reqwest::StatusCode), Reqwest(reqwest::Error), @@ -545,16 +763,34 @@ impl TenantSnapshot { } } - fn to_metrics(&self, tenant_id: TenantId, now: DateTime, metrics: &mut Vec) { + fn to_metrics( + &self, + tenant_id: TenantId, + now: DateTime, + cached: &Cache, + metrics: &mut Vec, + ) { let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size); let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size); - let synthetic_size = if self.synthetic_size != 0 { - // only send non-zeroes because otherwise these show up as errors in logs - Some(MetricsKey::synthetic_size(tenant_id).at(now, self.synthetic_size)) - } else { - None + let synthetic_size = { + let factory = MetricsKey::synthetic_size(tenant_id); + let mut synthetic_size = self.synthetic_size; + + if synthetic_size == 0 { + if let Some((_, value)) = cached.get(factory.key()) { + // use the latest value from previous session + synthetic_size = *value; + } + } + + if synthetic_size != 0 { + // only send non-zeroes because otherwise these show up as errors in logs + Some(factory.at(now, synthetic_size)) + } else { + None + } }; metrics.extend( @@ -585,8 +821,6 @@ impl TimelineSnapshot { t: &Arc, ctx: &RequestContext, ) -> anyhow::Result> { - use anyhow::Context; - if !t.is_active() { // no collection for broken or stopping needed, we will still keep the cached values // though at the caller. @@ -660,25 +894,30 @@ impl TimelineSnapshot { (DateTime::from(*loaded_at), disk_consistent_lsn.0) }); - // written_size_bytes_delta - metrics.extend( - if let Some(delta) = written_size_now.1.checked_sub(prev.1) { - let up_to = written_size_now - .0 - .absolute_time() - .expect("never create EventType::Incremental for written_size"); - let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta); - Some(key_value) - } else { - None - }, - ); + let up_to = now; - // written_size - metrics.push((key, written_size_now)); + if let Some(delta) = written_size_now.1.checked_sub(prev.1) { + let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta); + // written_size_delta + metrics.push(key_value); + // written_size + metrics.push((key, written_size_now)); + } else { + // the cached value was ahead of us, report zero until we've caught up + metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0)); + // the cached value was ahead of us, report the same until we've caught up + metrics.push((key, (written_size_now.0, prev.1))); + } - if let Some(size) = self.current_exact_logical_size { - metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size)); + { + let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id); + let current_or_previous = self + .current_exact_logical_size + .or_else(|| cache.get(factory.key()).map(|(_, val)| *val)); + + if let Some(size) = current_or_previous { + metrics.push(factory.at(now, size)); + } } } } @@ -738,9 +977,7 @@ mod tests { lsn::Lsn, }; - use crate::consumption_metrics::MetricsKey; - - use super::TimelineSnapshot; + use super::*; use chrono::{DateTime, Utc}; #[test] @@ -927,6 +1164,164 @@ mod tests { } } + #[test] + fn post_restart_written_sizes_with_rolled_back_last_record_lsn() { + // it can happen that we lose the inmemorylayer but have previously sent metrics and we + // should never go backwards + + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + + let [later, now, at_restart] = time_backwards(); + + // FIXME: tests would be so much easier if we did not need to juggle back and forth + // SystemTime and DateTime:: ... Could do the conversion only at upload time? + let now = DateTime::::from(now); + let later = DateTime::::from(later); + let before_restart = at_restart - std::time::Duration::from_secs(5 * 60); + let way_before = before_restart - std::time::Duration::from_secs(10 * 60); + let before_restart = DateTime::::from(before_restart); + let way_before = DateTime::::from(way_before); + + let snap = TimelineSnapshot { + loaded_at: (Lsn(50), at_restart), + last_record_lsn: Lsn(50), + current_exact_logical_size: None, + }; + + let mut cache = HashMap::from([ + MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + way_before, + before_restart, + // not taken into account, but the timestamps are important + 999_999_999, + ), + ]); + + let mut metrics = Vec::new(); + snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache); + + assert_eq!( + metrics, + &[ + MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + before_restart, + now, + 0 + ), + MetricsKey::written_size(tenant_id, timeline_id).at(now, 100), + ] + ); + + // now if we cache these metrics, and re-run while "still in recovery" + cache.extend(metrics.drain(..)); + + // "still in recovery", because our snapshot did not change + snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache); + + assert_eq!( + metrics, + &[ + MetricsKey::written_size_delta(tenant_id, timeline_id) + .from_previous_up_to(now, later, 0), + MetricsKey::written_size(tenant_id, timeline_id).at(later, 100), + ] + ); + } + + #[test] + fn post_restart_current_exact_logical_size_uses_cached() { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + + let [now, at_restart] = time_backwards(); + + let now = DateTime::::from(now); + let before_restart = at_restart - std::time::Duration::from_secs(5 * 60); + let before_restart = DateTime::::from(before_restart); + + let snap = TimelineSnapshot { + loaded_at: (Lsn(50), at_restart), + last_record_lsn: Lsn(50), + current_exact_logical_size: None, + }; + + let cache = HashMap::from([ + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100) + ]); + + let mut metrics = Vec::new(); + snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache); + + metrics.retain(|(key, _)| key.metric == Name::LogicalSize); + + assert_eq!( + metrics, + &[MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 100)] + ); + } + + #[test] + fn post_restart_synthetic_size_uses_cached_if_available() { + let tenant_id = TenantId::generate(); + + let ts = TenantSnapshot { + resident_size: 1000, + remote_size: 1000, + // not yet calculated + synthetic_size: 0, + }; + + let now = SystemTime::now(); + let before_restart = DateTime::::from(now - std::time::Duration::from_secs(5 * 60)); + let now = DateTime::::from(now); + + let cached = + HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]); + + let mut metrics = Vec::new(); + ts.to_metrics(tenant_id, now, &cached, &mut metrics); + + assert_eq!( + metrics, + &[ + MetricsKey::remote_storage_size(tenant_id).at(now, 1000), + MetricsKey::resident_size(tenant_id).at(now, 1000), + MetricsKey::synthetic_size(tenant_id).at(now, 1000), + ] + ); + } + + #[test] + fn post_restart_synthetic_size_is_not_sent_when_not_cached() { + let tenant_id = TenantId::generate(); + + let ts = TenantSnapshot { + resident_size: 1000, + remote_size: 1000, + // not yet calculated + synthetic_size: 0, + }; + + let now = SystemTime::now(); + let now = DateTime::::from(now); + + let cached = HashMap::new(); + + let mut metrics = Vec::new(); + ts.to_metrics(tenant_id, now, &cached, &mut metrics); + + assert_eq!( + metrics, + &[ + MetricsKey::remote_storage_size(tenant_id).at(now, 1000), + MetricsKey::resident_size(tenant_id).at(now, 1000), + // no synthetic size here + ] + ); + } + fn time_backwards() -> [std::time::SystemTime; N] { let mut times = [std::time::SystemTime::UNIX_EPOCH; N]; times[0] = std::time::SystemTime::now(); diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 821c2ae683..4c4c7b96fe 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -3,9 +3,11 @@ # Use mock HTTP server to receive metrics and verify that they look sane. # +import json +import time from pathlib import Path from queue import SimpleQueue -from typing import Any, Iterator, Set +from typing import Any, Dict, Iterator, Set import pytest from fixtures.log_helper import log @@ -18,6 +20,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import RemoteStorageKind +from fixtures.types import TenantId, TimelineId from pytest_httpserver import HTTPServer from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response @@ -35,8 +38,6 @@ def test_metric_collection( (host, port) = httpserver_listen_address metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" - metric_kinds_checked: Set[str] = set([]) - uploads: SimpleQueue[Any] = SimpleQueue() def metrics_handler(request: Request) -> Response: @@ -57,6 +58,7 @@ def test_metric_collection( metric_collection_interval="1s" metric_collection_endpoint="{metric_collection_endpoint}" cached_metric_collection_interval="0s" + synthetic_size_calculation_interval="3s" """ + "tenant_config={pitr_interval = '0 sec'}" ) @@ -75,8 +77,8 @@ def test_metric_collection( # httpserver is shut down before pageserver during passing run env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") tenant_id = env.initial_tenant - timeline_id = env.neon_cli.create_branch("test_metric_collection") - endpoint = env.endpoints.create_start("test_metric_collection") + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) pg_conn = endpoint.connect() cur = pg_conn.cursor() @@ -118,60 +120,227 @@ def test_metric_collection( # we expect uploads at 1Hz, on busy runners this could be too optimistic, # so give 5s we only want to get the following upload after "ready" value. - # later tests will be added to ensure that the timeseries are sane. timeout = 5 + + # these strings in the upload queue allow synchronizing with the uploads + # and the main test execution uploads.put("ready") + # note that this verifier graph should live across restarts as long as the + # cache file lives + v = MetricsVerifier() + while True: - # discard earlier than "ready" - log.info("waiting for upload") events = uploads.get(timeout=timeout) - import json if events == "ready": events = uploads.get(timeout=timeout) - httpserver.check() - httpserver.stop() - # if anything comes after this, we'll just ignore it - stringified = json.dumps(events, indent=2) - log.info(f"inspecting: {stringified}") + v.ingest(events) break else: - stringified = json.dumps(events, indent=2) - log.info(f"discarding: {stringified}") + v.ingest(events) - # verify that metrics look minimally sane - checks = { - "written_size": lambda value: value > 0, - "resident_size": lambda value: value >= 0, - "remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0, - # logical size may lag behind the actual size, so allow 0 here - "timeline_logical_size": lambda value: value >= 0, - # this can also be zero, depending on when we get the value - "written_data_bytes_delta": lambda value: value >= 0, - } + if "synthetic_storage_size" not in v.accepted_event_names(): + log.info("waiting for synthetic storage size to be calculated and uploaded...") - metric_kinds_checked = set() - metric_kinds_seen = set() + rounds = 0 + while "synthetic_storage_size" not in v.accepted_event_names(): + events = uploads.get(timeout=timeout) + v.ingest(events) + rounds += 1 + assert rounds < 10, "did not get synthetic_storage_size in 10 uploads" + # once we have it in verifiers, it will assert that future batches will contain it - for event in events: - assert event["tenant_id"] == str(tenant_id) - metric_name = event["metric"] - metric_kinds_seen.add(metric_name) + env.pageserver.stop() + time.sleep(1) + uploads.put("ready") + env.pageserver.start() - check = checks.get(metric_name) - # calm down mypy - if check is not None: - value = event["value"] - log.info(f"checking {metric_name} value {value}") - assert check(value), f"{metric_name} isn't valid" - metric_kinds_checked.add(metric_name) + while True: + events = uploads.get(timeout=timeout) - expected_checks = set(checks.keys()) - assert ( - metric_kinds_checked == checks.keys() - ), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered" - assert metric_kinds_seen == metric_kinds_checked + if events == "ready": + events = uploads.get(timeout=timeout * 3) + v.ingest(events) + events = uploads.get(timeout=timeout) + v.ingest(events) + break + else: + v.ingest(events) + + httpserver.check() + + +class MetricsVerifier: + """ + A graph of per tenant per timeline verifiers, allowing one for each + metric + """ + + def __init__(self): + self.tenants: Dict[TenantId, TenantMetricsVerifier] = {} + pass + + def ingest(self, events): + stringified = json.dumps(events, indent=2) + log.info(f"ingesting: {stringified}") + for event in events: + id = TenantId(event["tenant_id"]) + if id not in self.tenants: + self.tenants[id] = TenantMetricsVerifier(id) + + self.tenants[id].ingest(event) + + for t in self.tenants.values(): + t.post_batch() + + def accepted_event_names(self) -> Set[str]: + names: Set[str] = set() + for t in self.tenants.values(): + names = names.union(t.accepted_event_names()) + return names + + +class TenantMetricsVerifier: + def __init__(self, id: TenantId): + self.id = id + self.timelines: Dict[TimelineId, TimelineMetricsVerifier] = {} + self.state: Dict[str, Any] = {} + + def ingest(self, event): + assert TenantId(event["tenant_id"]) == self.id + + if "timeline_id" in event: + id = TimelineId(event["timeline_id"]) + if id not in self.timelines: + self.timelines[id] = TimelineMetricsVerifier(self.id, id) + + self.timelines[id].ingest(event) + else: + name = event["metric"] + if name not in self.state: + self.state[name] = PER_METRIC_VERIFIERS[name]() + self.state[name].ingest(event, self) + + def post_batch(self): + for v in self.state.values(): + v.post_batch(self) + + for tl in self.timelines.values(): + tl.post_batch(self) + + def accepted_event_names(self) -> Set[str]: + names = set(self.state.keys()) + for t in self.timelines.values(): + names = names.union(t.accepted_event_names()) + return names + + +class TimelineMetricsVerifier: + def __init__(self, tenant_id: TenantId, timeline_id: TimelineId): + self.id = timeline_id + self.state: Dict[str, Any] = {} + + def ingest(self, event): + name = event["metric"] + if name not in self.state: + self.state[name] = PER_METRIC_VERIFIERS[name]() + self.state[name].ingest(event, self) + + def post_batch(self, parent): + for v in self.state.values(): + v.post_batch(self) + + def accepted_event_names(self) -> Set[str]: + return set(self.state.keys()) + + +class CannotVerifyAnything: + """We can only assert types, but rust already has types, so no need.""" + + def __init__(self): + pass + + def ingest(self, event, parent): + pass + + def post_batch(self, parent): + pass + + +class WrittenDataVerifier: + def __init__(self): + self.values = [] + pass + + def ingest(self, event, parent): + self.values.append(event["value"]) + + def post_batch(self, parent): + pass + + +class WrittenDataDeltaVerifier: + def __init__(self): + self.value = None + self.sum = 0 + self.timerange = None + pass + + def ingest(self, event, parent): + assert event["type"] == "incremental" + self.value = event["value"] + self.sum += event["value"] + start = event["start_time"] + stop = event["stop_time"] + timerange = (start, stop) + if self.timerange is not None: + # this holds across restarts + assert self.timerange[1] == timerange[0], "time ranges should be continious" + self.timerange = timerange + + def post_batch(self, parent): + absolute = parent.state["written_size"] + if len(absolute.values) == 1: + # in tests this comes up as initdb execution, so we can have 0 or + # about 30MB on the first event. it is not consistent. + assert self.value is not None + else: + assert self.value == absolute.values[-1] - absolute.values[-2] + # sounds like this should hold, but it will not for branches -- probably related to timing + # assert self.sum == absolute.latest + + +class SyntheticSizeVerifier: + def __init__(self): + self.prev = None + self.value = None + pass + + def ingest(self, event, parent): + assert isinstance(parent, TenantMetricsVerifier) + assert event["type"] == "absolute" + value = event["value"] + self.value = value + + def post_batch(self, parent): + if self.prev is not None: + # this is assuming no one goes and deletes the cache file + assert ( + self.value is not None + ), "after calculating first synthetic size, cached or more recent should be sent" + self.prev = self.value + self.value = None + + +PER_METRIC_VERIFIERS = { + "remote_storage_size": CannotVerifyAnything, + "resident_size": CannotVerifyAnything, + "written_size": WrittenDataVerifier, + "written_data_bytes_delta": WrittenDataDeltaVerifier, + "timeline_logical_size": CannotVerifyAnything, + "synthetic_storage_size": SyntheticSizeVerifier, +} def proxy_metrics_handler(request: Request) -> Response: