diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index 418885a1b0..9dd9b0a0ef 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -27,7 +27,8 @@ impl EventType { } pub fn incremental_timerange(&self) -> Option>> { - // these can most likely be thought of as Range or RangeFull + // these can most likely be thought of as Range or RangeFull, at least pageserver creates + // incremental ranges where the stop and next start are equal. use EventType::*; match self { Incremental { @@ -41,15 +42,25 @@ impl EventType { pub fn is_incremental(&self) -> bool { matches!(self, EventType::Incremental { .. }) } + + /// Returns the absolute time, or for incremental ranges, the stop time. + pub fn recorded_at(&self) -> &DateTime { + use EventType::*; + + match self { + Absolute { time } => time, + Incremental { stop_time, .. } => stop_time, + } + } } #[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] -pub struct Event { +pub struct Event { #[serde(flatten)] #[serde(rename = "type")] pub kind: EventType, - pub metric: &'static str, + pub metric: Metric, pub idempotency_key: String, pub value: u64, @@ -58,12 +69,38 @@ pub struct Event { } pub fn idempotency_key(node_id: &str) -> String { - format!( - "{}-{}-{:04}", - Utc::now(), - node_id, - rand::thread_rng().gen_range(0..=9999) - ) + IdempotencyKey::generate(node_id).to_string() +} + +/// Downstream users will use these to detect upload retries. +pub struct IdempotencyKey<'a> { + now: chrono::DateTime, + node_id: &'a str, + nonce: u16, +} + +impl std::fmt::Display for IdempotencyKey<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce) + } +} + +impl<'a> IdempotencyKey<'a> { + pub fn generate(node_id: &'a str) -> Self { + IdempotencyKey { + now: Utc::now(), + node_id, + nonce: rand::thread_rng().gen_range(0..=9999), + } + } + + pub fn for_tests(now: DateTime, node_id: &'a str, nonce: u16) -> Self { + IdempotencyKey { + now, + node_id, + nonce, + } + } } pub const CHUNK_SIZE: usize = 1000; diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index e4284b9e9c..b24038de4a 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -9,6 +9,7 @@ use crate::tenant::{mgr, LogicalSizeCalculationCause}; use anyhow; use chrono::{DateTime, Utc}; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; +use futures::stream::StreamExt; use pageserver_api::models::TenantState; use reqwest::Url; use serde::Serialize; @@ -32,7 +33,37 @@ struct Ids { timeline_id: Option, } -/// Key that uniquely identifies the object, this metric describes. +/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events` +/// instead of static str. +// Do not rename any of these without first consulting with data team and partner +// management. +// FIXME: write those tests before refactoring to this! +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +enum Name { + /// Timeline last_record_lsn, absolute + #[serde(rename = "written_size")] + WrittenSize, + /// Timeline last_record_lsn, incremental + #[serde(rename = "written_data_bytes_delta")] + WrittenSizeDelta, + /// Timeline logical size + #[serde(rename = "timeline_logical_size")] + LogicalSize, + /// Tenant remote size + #[serde(rename = "remote_storage_size")] + RemoteSize, + /// Tenant resident size + #[serde(rename = "resident_size")] + ResidentSize, + /// Tenant synthetic size + #[serde(rename = "synthetic_storage_size")] + SyntheticSize, +} + +/// Key that uniquely identifies the object this metric describes. +/// +/// This is a denormalization done at the MetricsKey const methods; these should not be constructed +/// elsewhere. #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MetricsKey { tenant_id: TenantId, @@ -53,7 +84,7 @@ impl MetricsKey { struct AbsoluteValueFactory(MetricsKey); impl AbsoluteValueFactory { - fn at(self, time: DateTime, val: u64) -> (MetricsKey, (EventType, u64)) { + fn at(self, time: DateTime, val: u64) -> RawMetric { let key = self.0; (key, (EventType::Absolute { time }, val)) } @@ -69,7 +100,7 @@ impl IncrementalValueFactory { prev_end: DateTime, up_to: DateTime, val: u64, - ) -> (MetricsKey, (EventType, u64)) { + ) -> RawMetric { let key = self.0; // cannot assert prev_end < up_to because these are realtime clock based ( @@ -172,6 +203,20 @@ impl MetricsKey { } } +/// Basically a key-value pair, but usually in a Vec except for [`Cache`]. +/// +/// This is as opposed to `consumption_metrics::Event` which is the externally communicated form. +/// Difference is basically the missing idempotency key, which lives only for the duration of +/// upload attempts. +type RawMetric = (MetricsKey, (EventType, u64)); + +/// Caches the [`RawMetric`]s +/// +/// In practice, during startup, last sent values are stored here to be used in calculating new +/// ones. After successful uploading, the cached values are updated to cache. This used to be used +/// for deduplication, but that is no longer needed. +type Cache = HashMap; + /// Main thread that serves metrics collection pub async fn collect_metrics( metric_collection_endpoint: &Url, @@ -249,13 +294,12 @@ pub async fn collect_metrics( /// - refactor this function (chunking+sending part) to reuse it in proxy module; async fn collect_metrics_iteration( client: &reqwest::Client, - cached_metrics: &mut HashMap, + cached_metrics: &mut Cache, metric_collection_endpoint: &reqwest::Url, node_id: NodeId, ctx: &RequestContext, send_cached: bool, ) { - let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new(); trace!( "starting collect_metrics_iteration. metric_collection_endpoint: {}", metric_collection_endpoint @@ -270,69 +314,18 @@ async fn collect_metrics_iteration( } }; - // iterate through list of Active tenants and collect metrics - for (tenant_id, tenant_state) in tenants { - if tenant_state != TenantState::Active { - continue; + let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move { + if state != TenantState::Active { + None + } else { + mgr::get_tenant(id, true) + .await + .ok() + .map(|tenant| (id, tenant)) } + }); - let tenant = match mgr::get_tenant(tenant_id, true).await { - Ok(tenant) => tenant, - Err(err) => { - // It is possible that tenant was deleted between - // `list_tenants` and `get_tenant`, so just warn about it. - warn!("failed to get tenant {tenant_id:?}: {err:?}"); - continue; - } - }; - - let mut tenant_resident_size = 0; - - // iterate through list of timelines in tenant - for timeline in tenant.list_timelines() { - // collect per-timeline metrics only for active timelines - - let timeline_id = timeline.timeline_id; - - match TimelineSnapshot::collect(&timeline, ctx) { - Ok(Some(snap)) => { - snap.to_metrics( - tenant_id, - timeline_id, - Utc::now(), - &mut current_metrics, - cached_metrics, - ); - } - Ok(None) => {} - Err(e) => { - error!( - "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}", - timeline.timeline_id - ); - continue; - } - } - - tenant_resident_size += timeline.resident_physical_size(); - } - - current_metrics - .push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size())); - - current_metrics - .push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size)); - - // Note that this metric is calculated in a separate bgworker - // Here we only use cached value, which may lag behind the real latest one - let synthetic_size = tenant.cached_synthetic_size(); - - if synthetic_size != 0 { - // only send non-zeroes because otherwise these show up as errors in logs - current_metrics - .push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size)); - } - } + let mut current_metrics = collect(tenants, cached_metrics, ctx).await; // Filter metrics, unless we want to send all metrics, including cached ones. // See: https://github.com/neondatabase/neon/issues/3485 @@ -360,7 +353,7 @@ async fn collect_metrics_iteration( // Split into chunks of 1000 metrics to avoid exceeding the max request size let chunks = current_metrics.chunks(CHUNK_SIZE); - let mut chunk_to_send: Vec> = Vec::with_capacity(CHUNK_SIZE); + let mut chunk_to_send: Vec> = Vec::with_capacity(CHUNK_SIZE); let node_id = node_id.to_string(); @@ -422,6 +415,102 @@ async fn collect_metrics_iteration( } } +async fn collect( + tenants: S, + cache: &HashMap, + ctx: &RequestContext, +) -> Vec<(MetricsKey, (EventType, u64))> +where + S: futures::stream::Stream)>, +{ + let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new(); + + let mut tenants = std::pin::pin!(tenants); + + while let Some((tenant_id, tenant)) = tenants.next().await { + let mut tenant_resident_size = 0; + + // iterate through list of timelines in tenant + for timeline in tenant.list_timelines() { + // collect per-timeline metrics only for active timelines + + let timeline_id = timeline.timeline_id; + + match TimelineSnapshot::collect(&timeline, ctx) { + Ok(Some(snap)) => { + snap.to_metrics( + tenant_id, + timeline_id, + Utc::now(), + &mut current_metrics, + cache, + ); + } + Ok(None) => {} + Err(e) => { + error!( + "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}", + timeline.timeline_id + ); + continue; + } + } + + tenant_resident_size += timeline.resident_physical_size(); + } + + TenantSnapshot::collect(&tenant, tenant_resident_size).to_metrics( + tenant_id, + Utc::now(), + &mut current_metrics, + ); + } + + current_metrics +} + +/// Testing helping in-between abstraction allowing testing metrics without actual Tenants. +struct TenantSnapshot { + resident_size: u64, + remote_size: u64, + synthetic_size: u64, +} + +impl TenantSnapshot { + /// Collect tenant status to have metrics created out of it. + /// + /// `resident_size` is calculated of the timelines we had access to for other metrics, so we + /// cannot just list timelines here. + fn collect(t: &Arc, resident_size: u64) -> Self { + TenantSnapshot { + resident_size, + remote_size: t.remote_size(), + // Note that this metric is calculated in a separate bgworker + // Here we only use cached value, which may lag behind the real latest one + synthetic_size: t.cached_synthetic_size(), + } + } + + fn to_metrics(&self, tenant_id: TenantId, now: DateTime, metrics: &mut Vec) { + let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size); + + let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size); + + let synthetic_size = if self.synthetic_size != 0 { + // only send non-zeroes because otherwise these show up as errors in logs + Some(MetricsKey::synthetic_size(tenant_id).at(now, self.synthetic_size)) + } else { + None + }; + + metrics.extend( + [Some(remote_size), Some(resident_size), synthetic_size] + .into_iter() + .flatten(), + ); + } +} + /// Internal type to make timeline metric production testable. /// /// As this value type contains all of the information needed from a timeline to produce the @@ -478,22 +567,13 @@ impl TimelineSnapshot { tenant_id: TenantId, timeline_id: TimelineId, now: DateTime, - metrics: &mut Vec<(MetricsKey, (EventType, u64))>, - cache: &HashMap, + metrics: &mut Vec, + cache: &Cache, ) { let timeline_written_size = u64::from(self.last_record_lsn); - let (key, written_size_now) = - MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size); - - // last_record_lsn can only go up, right now at least, TODO: #2592 or related - // features might change this. - let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id); - // use this when available, because in a stream of incremental values, it will be - // accurate where as when last_record_lsn stops moving, we will only cache the last - // one of those. let last_stop_time = cache .get(written_size_delta_key.key()) .map(|(until, _val)| { @@ -503,6 +583,9 @@ impl TimelineSnapshot { .end }); + let (key, written_size_now) = + MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size); + // by default, use the last sent written_size as the basis for // calculating the delta. if we don't yet have one, use the load time value. let prev = cache @@ -547,53 +630,47 @@ impl TimelineSnapshot { } /// Caclculate synthetic size for each active tenant -pub async fn calculate_synthetic_size_worker( +async fn calculate_synthetic_size_worker( synthetic_size_calculation_interval: Duration, ctx: &RequestContext, ) -> anyhow::Result<()> { info!("starting calculate_synthetic_size_worker"); + // reminder: this ticker is ready right away let mut ticker = tokio::time::interval(synthetic_size_calculation_interval); + let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; loop { - tokio::select! { - _ = task_mgr::shutdown_watcher() => { - return Ok(()); - }, - tick_at = ticker.tick() => { + let tick_at = tokio::select! { + _ = task_mgr::shutdown_watcher() => return Ok(()), + tick_at = ticker.tick() => tick_at, + }; - let tenants = match mgr::list_tenants().await { - Ok(tenants) => tenants, - Err(e) => { - warn!("cannot get tenant list: {e:#}"); - continue; - } - }; - // iterate through list of Active tenants and collect metrics - for (tenant_id, tenant_state) in tenants { + let tenants = match mgr::list_tenants().await { + Ok(tenants) => tenants, + Err(e) => { + warn!("cannot get tenant list: {e:#}"); + continue; + } + }; - if tenant_state != TenantState::Active { - continue; - } - - if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await - { - if let Err(e) = tenant.calculate_synthetic_size( - LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize, - ctx).await { - error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e); - } - } + for (tenant_id, tenant_state) in tenants { + if tenant_state != TenantState::Active { + continue; + } + if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await { + if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await { + error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}"); } - - crate::tenant::tasks::warn_when_period_overrun( - tick_at.elapsed(), - synthetic_size_calculation_interval, - "consumption_metrics_synthetic_size_worker", - ); } } + + crate::tenant::tasks::warn_when_period_overrun( + tick_at.elapsed(), + synthetic_size_calculation_interval, + "consumption_metrics_synthetic_size_worker", + ); } } @@ -732,6 +809,70 @@ mod tests { ); } + #[test] + fn metric_image_stability() { + // it is important that these strings stay as they are + + let tenant_id = TenantId::from_array([0; 16]); + let timeline_id = TimelineId::from_array([0xff; 16]); + + let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap(); + let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap(); + + let [now, before] = [DateTime::::from(now), DateTime::from(before)]; + + let examples = [ + ( + line!(), + MetricsKey::written_size(tenant_id, timeline_id).at(now, 0), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, + ), + ( + line!(), + MetricsKey::written_size_delta(tenant_id, timeline_id) + .from_previous_up_to(before, now, 0), + r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, + ), + ( + line!(), + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#, + ), + ( + line!(), + MetricsKey::remote_storage_size(tenant_id).at(now, 0), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#, + ), + ( + line!(), + MetricsKey::resident_size(tenant_id).at(now, 0), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#, + ), + ( + line!(), + MetricsKey::synthetic_size(tenant_id).at(now, 1), + r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#, + ), + ]; + + let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0); + + for (line, (key, (kind, value)), expected) in examples { + let e = consumption_metrics::Event { + kind, + metric: key.metric, + idempotency_key: idempotency_key.to_string(), + value, + extra: super::Ids { + tenant_id: key.tenant_id, + timeline_id: key.timeline_id, + }, + }; + let actual = serde_json::to_string(&e).unwrap(); + assert_eq!(expected, actual, "example from line {line}"); + } + } + fn time_backwards() -> [std::time::SystemTime; N] { let mut times = [std::time::SystemTime::UNIX_EPOCH; N]; times[0] = std::time::SystemTime::now(); diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index d63f902ac1..9279002eb3 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -121,7 +121,7 @@ async fn collect_metrics_iteration( let current_metrics = gather_proxy_io_bytes_per_client(); - let metrics_to_send: Vec> = current_metrics + let metrics_to_send: Vec> = current_metrics .iter() .filter_map(|(curr_key, (curr_val, curr_time))| { let mut start_time = *curr_time;