From 78fa2b13e525e6fec62571f3f3cbe4fcaf749dd5 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 1 Aug 2023 15:30:36 +0300 Subject: [PATCH] test: written_size_bytes_delta (#4857) Two stabs at this, by mocking a http receiver and the globals out (now reverted) and then by separating the timeline dependency and just testing what kind of events certain timelines produce. I think this pattern could work for some of our problems. Follow-up to #4822. --- pageserver/src/consumption_metrics.rs | 399 ++++++++++++++++++++------ 1 file changed, 307 insertions(+), 92 deletions(-) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 45b4be470b..e462d59291 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -14,9 +14,11 @@ use reqwest::Url; use serde::Serialize; use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; -use std::time::Duration; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; use tracing::*; use utils::id::{NodeId, TenantId, TimelineId}; +use utils::lsn::Lsn; const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); @@ -51,9 +53,12 @@ impl PageserverConsumptionMetricsKey { struct AbsoluteValueFactory(PageserverConsumptionMetricsKey); impl AbsoluteValueFactory { - fn now(self, val: u64) -> (PageserverConsumptionMetricsKey, (EventType, u64)) { + fn at( + self, + time: DateTime, + val: u64, + ) -> (PageserverConsumptionMetricsKey, (EventType, u64)) { let key = self.0; - let time = Utc::now(); (key, (EventType::Absolute { time }, val)) } } @@ -270,95 +275,29 @@ pub async fn collect_metrics_iteration( let mut tenant_resident_size = 0; // iterate through list of timelines in tenant - for timeline in tenant.list_timelines().iter() { + for timeline in tenant.list_timelines() { // collect per-timeline metrics only for active timelines - if timeline.is_active() { - let timeline_written_size = u64::from(timeline.get_last_record_lsn()); - let (key, written_size_now) = - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline.timeline_id) - .now(timeline_written_size); + let timeline_id = timeline.timeline_id; - // last_record_lsn can only go up, right now at least, TODO: #2592 or related - // features might change this. - - let written_size_delta_key = PageserverConsumptionMetricsKey::written_size_delta( - tenant_id, - timeline.timeline_id, - ); - - // use this when available, because in a stream of incremental values, it will be - // accurate where as when last_record_lsn stops moving, we will only cache the last - // one of those. - let last_stop_time = - cached_metrics - .get(written_size_delta_key.key()) - .map(|(until, _val)| { - until - .incremental_timerange() - .expect("never create EventType::Absolute for written_size_delta") - .end - }); - - // by default, use the last sent written_size as the basis for - // calculating the delta. if we don't yet have one, use the load time value. - let prev = cached_metrics - .get(&key) - .map(|(prev_at, prev)| { - // use the prev time from our last incremental update, or default to latest - // absolute update on the first round. - let prev_at = prev_at - .absolute_time() - .expect("never create EventType::Incremental for written_size"); - let prev_at = last_stop_time.unwrap_or(prev_at); - (*prev_at, *prev) - }) - .unwrap_or_else(|| { - // if we don't have a previous point of comparison, compare to the load time - // lsn. - let (disk_consistent_lsn, loaded_at) = &timeline.loaded_at; - (DateTime::from(*loaded_at), disk_consistent_lsn.0) - }); - - // written_size_delta_bytes - current_metrics.extend( - if let Some(delta) = written_size_now.1.checked_sub(prev.1) { - let up_to = written_size_now - .0 - .absolute_time() - .expect("never create EventType::Incremental for written_size"); - let key_value = - written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta); - Some(key_value) - } else { - None - }, - ); - - // written_size - current_metrics.push((key, written_size_now)); - - let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id); - match span.in_scope(|| timeline.get_current_logical_size(ctx)) { - // Only send timeline logical size when it is fully calculated. - Ok((size, is_exact)) if is_exact => { - current_metrics.push( - PageserverConsumptionMetricsKey::timeline_logical_size( - tenant_id, - timeline.timeline_id, - ) - .now(size), - ); - } - Ok((_, _)) => {} - Err(err) => { - error!( - "failed to get current logical size for timeline {}: {err:?}", - timeline.timeline_id - ); - continue; - } - }; + match TimelineSnapshot::collect(&timeline, ctx) { + Ok(Some(snap)) => { + snap.to_metrics( + tenant_id, + timeline_id, + Utc::now(), + &mut current_metrics, + cached_metrics, + ); + } + Ok(None) => {} + Err(e) => { + error!( + "failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}", + timeline.timeline_id + ); + continue; + } } let timeline_resident_size = timeline.get_resident_physical_size(); @@ -369,7 +308,7 @@ pub async fn collect_metrics_iteration( Ok(tenant_remote_size) => { current_metrics.push( PageserverConsumptionMetricsKey::remote_storage_size(tenant_id) - .now(tenant_remote_size), + .at(Utc::now(), tenant_remote_size), ); } Err(err) => { @@ -381,7 +320,8 @@ pub async fn collect_metrics_iteration( } current_metrics.push( - PageserverConsumptionMetricsKey::resident_size(tenant_id).now(tenant_resident_size), + PageserverConsumptionMetricsKey::resident_size(tenant_id) + .at(Utc::now(), tenant_resident_size), ); // Note that this metric is calculated in a separate bgworker @@ -392,7 +332,7 @@ pub async fn collect_metrics_iteration( // only send non-zeroes because otherwise these show up as errors in logs current_metrics.push( PageserverConsumptionMetricsKey::synthetic_size(tenant_id) - .now(tenant_synthetic_size), + .at(Utc::now(), tenant_synthetic_size), ); } } @@ -486,6 +426,135 @@ pub async fn collect_metrics_iteration( } } +/// Internal type to make timeline metric production testable. +/// +/// As this value type contains all of the information needed from a timeline to produce the +/// metrics, it can easily be created with different values in test. +struct TimelineSnapshot { + loaded_at: (Lsn, SystemTime), + last_record_lsn: Lsn, + current_exact_logical_size: Option, +} + +impl TimelineSnapshot { + /// Collect the metrics from an actual timeline. + /// + /// Fails currently only when [`Timeline::get_current_logical_size`] fails. + /// + /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size + fn collect( + t: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result> { + use anyhow::Context; + + if !t.is_active() { + // no collection for broken or stopping needed, we will still keep the cached values + // though at the caller. + Ok(None) + } else { + let loaded_at = t.loaded_at; + let last_record_lsn = t.get_last_record_lsn(); + + let current_exact_logical_size = { + let span = info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id); + let res = span + .in_scope(|| t.get_current_logical_size(ctx)) + .context("get_current_logical_size"); + match res? { + // Only send timeline logical size when it is fully calculated. + (size, is_exact) if is_exact => Some(size), + (_, _) => None, + } + }; + + Ok(Some(TimelineSnapshot { + loaded_at, + last_record_lsn, + current_exact_logical_size, + })) + } + } + + /// Produce the timeline consumption metrics into the `metrics` argument. + fn to_metrics( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + now: DateTime, + metrics: &mut Vec<(PageserverConsumptionMetricsKey, (EventType, u64))>, + cache: &HashMap, + ) { + let timeline_written_size = u64::from(self.last_record_lsn); + + let (key, written_size_now) = + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) + .at(now, timeline_written_size); + + // last_record_lsn can only go up, right now at least, TODO: #2592 or related + // features might change this. + + let written_size_delta_key = + PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id); + + // use this when available, because in a stream of incremental values, it will be + // accurate where as when last_record_lsn stops moving, we will only cache the last + // one of those. + let last_stop_time = cache + .get(written_size_delta_key.key()) + .map(|(until, _val)| { + until + .incremental_timerange() + .expect("never create EventType::Absolute for written_size_delta") + .end + }); + + // by default, use the last sent written_size as the basis for + // calculating the delta. if we don't yet have one, use the load time value. + let prev = cache + .get(&key) + .map(|(prev_at, prev)| { + // use the prev time from our last incremental update, or default to latest + // absolute update on the first round. + let prev_at = prev_at + .absolute_time() + .expect("never create EventType::Incremental for written_size"); + let prev_at = last_stop_time.unwrap_or(prev_at); + (*prev_at, *prev) + }) + .unwrap_or_else(|| { + // if we don't have a previous point of comparison, compare to the load time + // lsn. + let (disk_consistent_lsn, loaded_at) = &self.loaded_at; + (DateTime::from(*loaded_at), disk_consistent_lsn.0) + }); + + // written_size_bytes_delta + metrics.extend( + if let Some(delta) = written_size_now.1.checked_sub(prev.1) { + let up_to = written_size_now + .0 + .absolute_time() + .expect("never create EventType::Incremental for written_size"); + let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta); + Some(key_value) + } else { + None + }, + ); + + // written_size + metrics.push((key, written_size_now)); + + if let Some(size) = self.current_exact_logical_size { + metrics.push( + PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) + .at(now, size), + ); + } + } +} + /// Caclculate synthetic size for each active tenant pub async fn calculate_synthetic_size_worker( synthetic_size_calculation_interval: Duration, @@ -536,3 +605,149 @@ pub async fn calculate_synthetic_size_worker( } } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use std::time::SystemTime; + use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, + }; + + use crate::consumption_metrics::PageserverConsumptionMetricsKey; + + use super::TimelineSnapshot; + use chrono::{DateTime, Utc}; + + #[test] + fn startup_collected_timeline_metrics_before_advancing() { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + + let mut metrics = Vec::new(); + let cache = HashMap::new(); + + let initdb_lsn = Lsn(0x10000); + let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2); + + let snap = TimelineSnapshot { + loaded_at: (disk_consistent_lsn, SystemTime::now()), + last_record_lsn: disk_consistent_lsn, + current_exact_logical_size: Some(0x42000), + }; + + let now = DateTime::::from(SystemTime::now()); + + snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache); + + assert_eq!( + metrics, + &[ + PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) + .from_previous_up_to(snap.loaded_at.1.into(), now, 0), + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) + .at(now, disk_consistent_lsn.0), + PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) + .at(now, 0x42000) + ] + ); + } + + #[test] + fn startup_collected_timeline_metrics_second_round() { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + + let [now, before, init] = time_backwards(); + + let now = DateTime::::from(now); + let before = DateTime::::from(before); + + let initdb_lsn = Lsn(0x10000); + let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2); + + let mut metrics = Vec::new(); + let cache = + HashMap::from([ + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) + .at(before, disk_consistent_lsn.0), + ]); + + let snap = TimelineSnapshot { + loaded_at: (disk_consistent_lsn, init), + last_record_lsn: disk_consistent_lsn, + current_exact_logical_size: Some(0x42000), + }; + + snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache); + + assert_eq!( + metrics, + &[ + PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) + .from_previous_up_to(before, now, 0), + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) + .at(now, disk_consistent_lsn.0), + PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) + .at(now, 0x42000) + ] + ); + } + + #[test] + fn startup_collected_timeline_metrics_nth_round_at_same_lsn() { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + + let [now, just_before, before, init] = time_backwards(); + + let now = DateTime::::from(now); + let just_before = DateTime::::from(just_before); + let before = DateTime::::from(before); + + let initdb_lsn = Lsn(0x10000); + let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2); + + let mut metrics = Vec::new(); + let cache = HashMap::from([ + // at t=before was the last time the last_record_lsn changed + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) + .at(before, disk_consistent_lsn.0), + // end time of this event is used for the next ones + PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) + .from_previous_up_to(before, just_before, 0), + ]); + + let snap = TimelineSnapshot { + loaded_at: (disk_consistent_lsn, init), + last_record_lsn: disk_consistent_lsn, + current_exact_logical_size: Some(0x42000), + }; + + snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache); + + assert_eq!( + metrics, + &[ + PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) + .from_previous_up_to(just_before, now, 0), + PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) + .at(now, disk_consistent_lsn.0), + PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) + .at(now, 0x42000) + ] + ); + } + + fn time_backwards() -> [std::time::SystemTime; N] { + let mut times = [std::time::SystemTime::UNIX_EPOCH; N]; + times[0] = std::time::SystemTime::now(); + for behind in 1..N { + times[behind] = times[0] - std::time::Duration::from_secs(behind as u64); + } + + times + } +}