From 326189d95080d85bb1c635da3511501836839b8d Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 31 Jul 2023 22:10:19 +0300 Subject: [PATCH] consumption_metrics: send timeline_written_size_delta (#4822) We want to have timeline_written_size_delta which is defined as difference to the previously sent `timeline_written_size` from the current `timeline_written_size`. Solution is to send it. On the first round `disk_consistent_lsn` is used which is captured during `load` time. After that an incremental "event" is sent on every collection. Incremental "events" are not part of deduplication. I've added some infrastructure to allow somewhat typesafe `EventType::Absolute` and `EventType::Incremental` factories per metrics, now that we have our first `EventType::Incremental` usage. --- libs/consumption_metrics/src/lib.rs | 28 ++- pageserver/src/consumption_metrics.rs | 263 ++++++++++++++++++++------ pageserver/src/tenant/timeline.rs | 6 + 3 files changed, 243 insertions(+), 54 deletions(-) diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index 3aac00662d..3a1b396d63 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use rand::Rng; use serde::Serialize; -#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] #[serde(tag = "type")] pub enum EventType { #[serde(rename = "absolute")] @@ -17,6 +17,32 @@ pub enum EventType { }, } +impl EventType { + pub fn absolute_time(&self) -> Option<&DateTime> { + use EventType::*; + match self { + Absolute { time } => Some(time), + _ => None, + } + } + + pub fn incremental_timerange(&self) -> Option>> { + // these can most likely be thought of as Range or RangeFull + use EventType::*; + match self { + Incremental { + start_time, + stop_time, + } => Some(start_time..stop_time), + _ => None, + } + } + + pub fn is_incremental(&self) -> bool { + matches!(self, EventType::Incremental { .. }) + } +} + #[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] pub struct Event { #[serde(flatten)] diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index df44300fce..45b4be470b 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -7,7 +7,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::{mgr, LogicalSizeCalculationCause}; use anyhow; -use chrono::Utc; +use chrono::{DateTime, Utc}; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; use pageserver_api::models::TenantState; use reqwest::Url; @@ -18,12 +18,6 @@ use std::time::Duration; use tracing::*; use utils::id::{NodeId, TenantId, TimelineId}; -const WRITTEN_SIZE: &str = "written_size"; -const SYNTHETIC_STORAGE_SIZE: &str = "synthetic_storage_size"; -const RESIDENT_SIZE: &str = "resident_size"; -const REMOTE_STORAGE_SIZE: &str = "remote_storage_size"; -const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size"; - const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); #[serde_as] @@ -44,6 +38,121 @@ pub struct PageserverConsumptionMetricsKey { pub metric: &'static str, } +impl PageserverConsumptionMetricsKey { + const fn absolute_values(self) -> AbsoluteValueFactory { + AbsoluteValueFactory(self) + } + const fn incremental_values(self) -> IncrementalValueFactory { + IncrementalValueFactory(self) + } +} + +/// Helper type which each individual metric kind can return to produce only absolute values. +struct AbsoluteValueFactory(PageserverConsumptionMetricsKey); + +impl AbsoluteValueFactory { + fn now(self, val: u64) -> (PageserverConsumptionMetricsKey, (EventType, u64)) { + let key = self.0; + let time = Utc::now(); + (key, (EventType::Absolute { time }, val)) + } +} + +/// Helper type which each individual metric kind can return to produce only incremental values. +struct IncrementalValueFactory(PageserverConsumptionMetricsKey); + +impl IncrementalValueFactory { + #[allow(clippy::wrong_self_convention)] + fn from_previous_up_to( + self, + prev_end: DateTime, + up_to: DateTime, + val: u64, + ) -> (PageserverConsumptionMetricsKey, (EventType, u64)) { + let key = self.0; + // cannot assert prev_end < up_to because these are realtime clock based + ( + key, + ( + EventType::Incremental { + start_time: prev_end, + stop_time: up_to, + }, + val, + ), + ) + } + + fn key(&self) -> &PageserverConsumptionMetricsKey { + &self.0 + } +} + +// the static part of a PageserverConsumptionMetricsKey +impl PageserverConsumptionMetricsKey { + const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory { + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: Some(timeline_id), + metric: "written_size", + } + .absolute_values() + } + + /// Values will be the difference of the latest written_size (last_record_lsn) to what we + /// previously sent. + const fn written_size_delta( + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> IncrementalValueFactory { + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: Some(timeline_id), + metric: "written_size_bytes_delta", + } + .incremental_values() + } + + const fn timeline_logical_size( + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> AbsoluteValueFactory { + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: Some(timeline_id), + metric: "timeline_logical_size", + } + .absolute_values() + } + + const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory { + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: None, + metric: "remote_storage_size", + } + .absolute_values() + } + + const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory { + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: None, + metric: "resident_size", + } + .absolute_values() + } + + const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory { + PageserverConsumptionMetricsKey { + tenant_id, + timeline_id: None, + metric: "synthetic_storage_size", + } + .absolute_values() + } +} + /// Main thread that serves metrics collection pub async fn collect_metrics( metric_collection_endpoint: &Url, @@ -79,7 +188,7 @@ pub async fn collect_metrics( .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT) .build() .expect("Failed to create http client with timeout"); - let mut cached_metrics: HashMap = HashMap::new(); + let mut cached_metrics = HashMap::new(); let mut prev_iteration_time: std::time::Instant = std::time::Instant::now(); loop { @@ -121,13 +230,13 @@ pub async fn collect_metrics( /// - refactor this function (chunking+sending part) to reuse it in proxy module; pub async fn collect_metrics_iteration( client: &reqwest::Client, - cached_metrics: &mut HashMap, + cached_metrics: &mut HashMap, metric_collection_endpoint: &reqwest::Url, node_id: NodeId, ctx: &RequestContext, send_cached: bool, ) { - let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new(); + let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, (EventType, u64))> = Vec::new(); trace!( "starting collect_metrics_iteration. metric_collection_endpoint: {}", metric_collection_endpoint @@ -166,27 +275,80 @@ pub async fn collect_metrics_iteration( if timeline.is_active() { let timeline_written_size = u64::from(timeline.get_last_record_lsn()); - current_metrics.push(( - PageserverConsumptionMetricsKey { - tenant_id, - timeline_id: Some(timeline.timeline_id), - metric: WRITTEN_SIZE, + let (key, written_size_now) = + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline.timeline_id) + .now(timeline_written_size); + + // last_record_lsn can only go up, right now at least, TODO: #2592 or related + // features might change this. + + let written_size_delta_key = PageserverConsumptionMetricsKey::written_size_delta( + tenant_id, + timeline.timeline_id, + ); + + // use this when available, because in a stream of incremental values, it will be + // accurate where as when last_record_lsn stops moving, we will only cache the last + // one of those. + let last_stop_time = + cached_metrics + .get(written_size_delta_key.key()) + .map(|(until, _val)| { + until + .incremental_timerange() + .expect("never create EventType::Absolute for written_size_delta") + .end + }); + + // by default, use the last sent written_size as the basis for + // calculating the delta. if we don't yet have one, use the load time value. + let prev = cached_metrics + .get(&key) + .map(|(prev_at, prev)| { + // use the prev time from our last incremental update, or default to latest + // absolute update on the first round. + let prev_at = prev_at + .absolute_time() + .expect("never create EventType::Incremental for written_size"); + let prev_at = last_stop_time.unwrap_or(prev_at); + (*prev_at, *prev) + }) + .unwrap_or_else(|| { + // if we don't have a previous point of comparison, compare to the load time + // lsn. + let (disk_consistent_lsn, loaded_at) = &timeline.loaded_at; + (DateTime::from(*loaded_at), disk_consistent_lsn.0) + }); + + // written_size_delta_bytes + current_metrics.extend( + if let Some(delta) = written_size_now.1.checked_sub(prev.1) { + let up_to = written_size_now + .0 + .absolute_time() + .expect("never create EventType::Incremental for written_size"); + let key_value = + written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta); + Some(key_value) + } else { + None }, - timeline_written_size, - )); + ); + + // written_size + current_metrics.push((key, written_size_now)); let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id); match span.in_scope(|| timeline.get_current_logical_size(ctx)) { // Only send timeline logical size when it is fully calculated. Ok((size, is_exact)) if is_exact => { - current_metrics.push(( - PageserverConsumptionMetricsKey { + current_metrics.push( + PageserverConsumptionMetricsKey::timeline_logical_size( tenant_id, - timeline_id: Some(timeline.timeline_id), - metric: TIMELINE_LOGICAL_SIZE, - }, - size, - )); + timeline.timeline_id, + ) + .now(size), + ); } Ok((_, _)) => {} Err(err) => { @@ -205,14 +367,10 @@ pub async fn collect_metrics_iteration( match tenant.get_remote_size().await { Ok(tenant_remote_size) => { - current_metrics.push(( - PageserverConsumptionMetricsKey { - tenant_id, - timeline_id: None, - metric: REMOTE_STORAGE_SIZE, - }, - tenant_remote_size, - )); + current_metrics.push( + PageserverConsumptionMetricsKey::remote_storage_size(tenant_id) + .now(tenant_remote_size), + ); } Err(err) => { error!( @@ -222,14 +380,9 @@ pub async fn collect_metrics_iteration( } } - current_metrics.push(( - PageserverConsumptionMetricsKey { - tenant_id, - timeline_id: None, - metric: RESIDENT_SIZE, - }, - tenant_resident_size, - )); + current_metrics.push( + PageserverConsumptionMetricsKey::resident_size(tenant_id).now(tenant_resident_size), + ); // Note that this metric is calculated in a separate bgworker // Here we only use cached value, which may lag behind the real latest one @@ -237,23 +390,27 @@ pub async fn collect_metrics_iteration( if tenant_synthetic_size != 0 { // only send non-zeroes because otherwise these show up as errors in logs - current_metrics.push(( - PageserverConsumptionMetricsKey { - tenant_id, - timeline_id: None, - metric: SYNTHETIC_STORAGE_SIZE, - }, - tenant_synthetic_size, - )); + current_metrics.push( + PageserverConsumptionMetricsKey::synthetic_size(tenant_id) + .now(tenant_synthetic_size), + ); } } // Filter metrics, unless we want to send all metrics, including cached ones. // See: https://github.com/neondatabase/neon/issues/3485 if !send_cached { - current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) { - Some(val) => val != curr_val, - None => true, + current_metrics.retain(|(curr_key, (kind, curr_val))| { + if kind.is_incremental() { + // incremental values (currently only written_size_delta) should not get any cache + // deduplication because they will be used by upstream for "is still alive." + true + } else { + match cached_metrics.get(curr_key) { + Some((_, val)) => val != curr_val, + None => true, + } + } }); } @@ -272,8 +429,8 @@ pub async fn collect_metrics_iteration( chunk_to_send.clear(); // enrich metrics with type,timestamp and idempotency key before sending - chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| Event { - kind: EventType::Absolute { time: Utc::now() }, + chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event { + kind: *when, metric: curr_key.metric, idempotency_key: idempotency_key(node_id.to_string()), value: *curr_val, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c7974e9c00..34211fb714 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -294,6 +294,10 @@ pub struct Timeline { /// Completion shared between all timelines loaded during startup; used to delay heavier /// background tasks until some logical sizes have been calculated. initial_logical_size_attempt: Mutex>, + + /// Load or creation time information about the disk_consistent_lsn and when the loading + /// happened. Used for consumption metrics. + pub(crate) loaded_at: (Lsn, SystemTime), } pub struct WalReceiverInfo { @@ -1404,6 +1408,8 @@ impl Timeline { last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0), last_freeze_ts: RwLock::new(Instant::now()), + loaded_at: (disk_consistent_lsn, SystemTime::now()), + ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn(),