diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index 3a1b396d63..418885a1b0 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -57,7 +57,7 @@ pub struct Event { pub extra: Extra, } -pub fn idempotency_key(node_id: String) -> String { +pub fn idempotency_key(node_id: &str) -> String { format!( "{}-{}-{:04}", Utc::now(), @@ -71,6 +71,6 @@ pub const CHUNK_SIZE: usize = 1000; // Just a wrapper around a slice of events // to serialize it as `{"events" : [ ] } #[derive(serde::Serialize)] -pub struct EventChunk<'a, T> { - pub events: &'a [T], +pub struct EventChunk<'a, T: Clone> { + pub events: std::borrow::Cow<'a, [T]>, } diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index e462d59291..326802e7e4 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -23,7 +23,7 @@ use utils::lsn::Lsn; const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); #[serde_as] -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Clone, Copy)] struct Ids { #[serde_as(as = "DisplayFromStr")] tenant_id: TenantId, @@ -34,13 +34,13 @@ struct Ids { /// Key that uniquely identifies the object, this metric describes. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PageserverConsumptionMetricsKey { - pub tenant_id: TenantId, - pub timeline_id: Option, - pub metric: &'static str, +struct MetricsKey { + tenant_id: TenantId, + timeline_id: Option, + metric: &'static str, } -impl PageserverConsumptionMetricsKey { +impl MetricsKey { const fn absolute_values(self) -> AbsoluteValueFactory { AbsoluteValueFactory(self) } @@ -50,21 +50,17 @@ impl PageserverConsumptionMetricsKey { } /// Helper type which each individual metric kind can return to produce only absolute values. -struct AbsoluteValueFactory(PageserverConsumptionMetricsKey); +struct AbsoluteValueFactory(MetricsKey); impl AbsoluteValueFactory { - fn at( - self, - time: DateTime, - val: u64, - ) -> (PageserverConsumptionMetricsKey, (EventType, u64)) { + fn at(self, time: DateTime, val: u64) -> (MetricsKey, (EventType, u64)) { let key = self.0; (key, (EventType::Absolute { time }, val)) } } /// Helper type which each individual metric kind can return to produce only incremental values. -struct IncrementalValueFactory(PageserverConsumptionMetricsKey); +struct IncrementalValueFactory(MetricsKey); impl IncrementalValueFactory { #[allow(clippy::wrong_self_convention)] @@ -73,7 +69,7 @@ impl IncrementalValueFactory { prev_end: DateTime, up_to: DateTime, val: u64, - ) -> (PageserverConsumptionMetricsKey, (EventType, u64)) { + ) -> (MetricsKey, (EventType, u64)) { let key = self.0; // cannot assert prev_end < up_to because these are realtime clock based ( @@ -88,15 +84,18 @@ impl IncrementalValueFactory { ) } - fn key(&self) -> &PageserverConsumptionMetricsKey { + fn key(&self) -> &MetricsKey { &self.0 } } -// the static part of a PageserverConsumptionMetricsKey -impl PageserverConsumptionMetricsKey { +// the static part of a MetricsKey +impl MetricsKey { + /// Absolute value of [`Timeline::get_last_record_lsn`]. + /// + /// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory { - PageserverConsumptionMetricsKey { + MetricsKey { tenant_id, timeline_id: Some(timeline_id), metric: "written_size", @@ -104,13 +103,14 @@ impl PageserverConsumptionMetricsKey { .absolute_values() } - /// Values will be the difference of the latest written_size (last_record_lsn) to what we - /// previously sent. + /// Values will be the difference of the latest [`MetricsKey::written_size`] to what we + /// previously sent, starting from the previously sent incremental time range ending at the + /// latest absolute measurement. const fn written_size_delta( tenant_id: TenantId, timeline_id: TimelineId, ) -> IncrementalValueFactory { - PageserverConsumptionMetricsKey { + MetricsKey { tenant_id, timeline_id: Some(timeline_id), metric: "written_size_bytes_delta", @@ -118,11 +118,14 @@ impl PageserverConsumptionMetricsKey { .incremental_values() } + /// Exact [`Timeline::get_current_logical_size`]. + /// + /// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size const fn timeline_logical_size( tenant_id: TenantId, timeline_id: TimelineId, ) -> AbsoluteValueFactory { - PageserverConsumptionMetricsKey { + MetricsKey { tenant_id, timeline_id: Some(timeline_id), metric: "timeline_logical_size", @@ -130,8 +133,11 @@ impl PageserverConsumptionMetricsKey { .absolute_values() } + /// [`Tenant::remote_size`] + /// + /// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory { - PageserverConsumptionMetricsKey { + MetricsKey { tenant_id, timeline_id: None, metric: "remote_storage_size", @@ -139,8 +145,11 @@ impl PageserverConsumptionMetricsKey { .absolute_values() } + /// Sum of [`Timeline::resident_physical_size`] for each `Tenant`. + /// + /// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory { - PageserverConsumptionMetricsKey { + MetricsKey { tenant_id, timeline_id: None, metric: "resident_size", @@ -148,8 +157,11 @@ impl PageserverConsumptionMetricsKey { .absolute_values() } + /// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`]. + /// + /// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory { - PageserverConsumptionMetricsKey { + MetricsKey { tenant_id, timeline_id: None, metric: "synthetic_storage_size", @@ -233,15 +245,15 @@ pub async fn collect_metrics( /// /// TODO /// - refactor this function (chunking+sending part) to reuse it in proxy module; -pub async fn collect_metrics_iteration( +async fn collect_metrics_iteration( client: &reqwest::Client, - cached_metrics: &mut HashMap, + cached_metrics: &mut HashMap, metric_collection_endpoint: &reqwest::Url, node_id: NodeId, ctx: &RequestContext, send_cached: bool, ) { - let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, (EventType, u64))> = Vec::new(); + let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new(); trace!( "starting collect_metrics_iteration. metric_collection_endpoint: {}", metric_collection_endpoint @@ -300,40 +312,23 @@ pub async fn collect_metrics_iteration( } } - let timeline_resident_size = timeline.get_resident_physical_size(); - tenant_resident_size += timeline_resident_size; + tenant_resident_size += timeline.resident_physical_size(); } - match tenant.get_remote_size().await { - Ok(tenant_remote_size) => { - current_metrics.push( - PageserverConsumptionMetricsKey::remote_storage_size(tenant_id) - .at(Utc::now(), tenant_remote_size), - ); - } - Err(err) => { - error!( - "failed to get remote size for tenant {}: {err:?}", - tenant_id - ); - } - } + current_metrics + .push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size())); - current_metrics.push( - PageserverConsumptionMetricsKey::resident_size(tenant_id) - .at(Utc::now(), tenant_resident_size), - ); + current_metrics + .push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size)); // Note that this metric is calculated in a separate bgworker // Here we only use cached value, which may lag behind the real latest one - let tenant_synthetic_size = tenant.get_cached_synthetic_size(); + let synthetic_size = tenant.cached_synthetic_size(); - if tenant_synthetic_size != 0 { + if synthetic_size != 0 { // only send non-zeroes because otherwise these show up as errors in logs - current_metrics.push( - PageserverConsumptionMetricsKey::synthetic_size(tenant_id) - .at(Utc::now(), tenant_synthetic_size), - ); + current_metrics + .push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size)); } } @@ -365,6 +360,8 @@ pub async fn collect_metrics_iteration( let mut chunk_to_send: Vec> = Vec::with_capacity(CHUNK_SIZE); + let node_id = node_id.to_string(); + for chunk in chunks { chunk_to_send.clear(); @@ -372,7 +369,7 @@ pub async fn collect_metrics_iteration( chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event { kind: *when, metric: curr_key.metric, - idempotency_key: idempotency_key(node_id.to_string()), + idempotency_key: idempotency_key(&node_id), value: *curr_val, extra: Ids { tenant_id: curr_key.tenant_id, @@ -380,17 +377,14 @@ pub async fn collect_metrics_iteration( }, })); - let chunk_json = serde_json::value::to_raw_value(&EventChunk { - events: &chunk_to_send, - }) - .expect("PageserverConsumptionMetric should not fail serialization"); - const MAX_RETRIES: u32 = 3; for attempt in 0..MAX_RETRIES { let res = client .post(metric_collection_endpoint.clone()) - .json(&chunk_json) + .json(&EventChunk { + events: (&chunk_to_send).into(), + }) .send() .await; @@ -482,20 +476,18 @@ impl TimelineSnapshot { tenant_id: TenantId, timeline_id: TimelineId, now: DateTime, - metrics: &mut Vec<(PageserverConsumptionMetricsKey, (EventType, u64))>, - cache: &HashMap, + metrics: &mut Vec<(MetricsKey, (EventType, u64))>, + cache: &HashMap, ) { let timeline_written_size = u64::from(self.last_record_lsn); let (key, written_size_now) = - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) - .at(now, timeline_written_size); + MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size); // last_record_lsn can only go up, right now at least, TODO: #2592 or related // features might change this. - let written_size_delta_key = - PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id); + let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id); // use this when available, because in a stream of incremental values, it will be // accurate where as when last_record_lsn stops moving, we will only cache the last @@ -547,10 +539,7 @@ impl TimelineSnapshot { metrics.push((key, written_size_now)); if let Some(size) = self.current_exact_logical_size { - metrics.push( - PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) - .at(now, size), - ); + metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size)); } } } @@ -569,7 +558,7 @@ pub async fn calculate_synthetic_size_worker( _ = task_mgr::shutdown_watcher() => { return Ok(()); }, - tick_at = ticker.tick() => { + tick_at = ticker.tick() => { let tenants = match mgr::list_tenants().await { Ok(tenants) => tenants, @@ -616,7 +605,7 @@ mod tests { lsn::Lsn, }; - use crate::consumption_metrics::PageserverConsumptionMetricsKey; + use crate::consumption_metrics::MetricsKey; use super::TimelineSnapshot; use chrono::{DateTime, Utc}; @@ -645,12 +634,13 @@ mod tests { assert_eq!( metrics, &[ - PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) - .from_previous_up_to(snap.loaded_at.1.into(), now, 0), - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) - .at(now, disk_consistent_lsn.0), - PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) - .at(now, 0x42000) + MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + snap.loaded_at.1.into(), + now, + 0 + ), + MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0), + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000) ] ); } @@ -669,11 +659,9 @@ mod tests { let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2); let mut metrics = Vec::new(); - let cache = - HashMap::from([ - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) - .at(before, disk_consistent_lsn.0), - ]); + let cache = HashMap::from([ + MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0) + ]); let snap = TimelineSnapshot { loaded_at: (disk_consistent_lsn, init), @@ -686,12 +674,10 @@ mod tests { assert_eq!( metrics, &[ - PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) + MetricsKey::written_size_delta(tenant_id, timeline_id) .from_previous_up_to(before, now, 0), - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) - .at(now, disk_consistent_lsn.0), - PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) - .at(now, 0x42000) + MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0), + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000) ] ); } @@ -713,11 +699,13 @@ mod tests { let mut metrics = Vec::new(); let cache = HashMap::from([ // at t=before was the last time the last_record_lsn changed - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) - .at(before, disk_consistent_lsn.0), + MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0), // end time of this event is used for the next ones - PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) - .from_previous_up_to(before, just_before, 0), + MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + before, + just_before, + 0, + ), ]); let snap = TimelineSnapshot { @@ -731,12 +719,13 @@ mod tests { assert_eq!( metrics, &[ - PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id) - .from_previous_up_to(just_before, now, 0), - PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id) - .at(now, disk_consistent_lsn.0), - PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id) - .at(now, 0x42000) + MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to( + just_before, + now, + 0 + ), + MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0), + MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000) ] ); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1b287ee10f..9fd6005330 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -644,20 +644,19 @@ impl Tenant { Ok(()) } - /// get size of all remote timelines + /// Get sum of all remote timelines sizes /// /// This function relies on the index_part instead of listing the remote storage - /// - pub async fn get_remote_size(&self) -> anyhow::Result { + pub fn remote_size(&self) -> u64 { let mut size = 0; - for timeline in self.list_timelines().iter() { + for timeline in self.list_timelines() { if let Some(remote_client) = &timeline.remote_client { size += remote_client.get_remote_physical_size(); } } - Ok(size) + size } #[instrument(skip_all, fields(timeline_id=%timeline_id))] @@ -2889,7 +2888,7 @@ impl Tenant { .set(size); } - pub fn get_cached_synthetic_size(&self) -> u64 { + pub fn cached_synthetic_size(&self) -> u64 { self.cached_synthetic_tenant_size.load(Ordering::Relaxed) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 628865ea2b..647c9b7b0e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -528,7 +528,7 @@ impl Timeline { size } - pub fn get_resident_physical_size(&self) -> u64 { + pub fn resident_physical_size(&self) -> u64 { self.metrics.resident_physical_size_gauge.get() } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index c4be7e1f08..d63f902ac1 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -11,7 +11,6 @@ const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client"; const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); -/// /// Key that uniquely identifies the object, this metric describes. /// Currently, endpoint_id is enough, but this may change later, /// so keep it in a named struct. @@ -19,8 +18,7 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); /// Both the proxy and the ingestion endpoint will live in the same region (or cell) /// so while the project-id is unique across regions the whole pipeline will work correctly /// because we enrich the event with project_id in the control-plane endpoint. -/// -#[derive(Eq, Hash, PartialEq, Serialize, Debug)] +#[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)] pub struct Ids { pub endpoint_id: String, pub branch_id: String, @@ -149,7 +147,7 @@ async fn collect_metrics_iteration( stop_time: *curr_time, }, metric: PROXY_IO_BYTES_PER_CLIENT, - idempotency_key: idempotency_key(hostname.to_owned()), + idempotency_key: idempotency_key(hostname), value, extra: Ids { endpoint_id: curr_key.endpoint_id.clone(), @@ -167,12 +165,11 @@ async fn collect_metrics_iteration( // Send metrics. // Split into chunks of 1000 metrics to avoid exceeding the max request size for chunk in metrics_to_send.chunks(CHUNK_SIZE) { - let chunk_json = serde_json::value::to_raw_value(&EventChunk { events: chunk }) - .expect("ProxyConsumptionMetric should not fail serialization"); - let res = client .post(metric_collection_endpoint.clone()) - .json(&chunk_json) + .json(&EventChunk { + events: chunk.into(), + }) .send() .await;