From f731e9b3de1089f4bc5d6fb683395eb428b516fe Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 29 Dec 2022 12:11:04 +0200 Subject: [PATCH] Fix serialization of billing metrics (#3215) Fixes: - serialize TenantId and TimelineId as strings, - skip TimelineId if none - serialize `metric_type` field as `type` - add `idempotency_key` field to uniquely identify metrics --- pageserver/src/billing_metrics.rs | 50 +++++++++++++++++++++++-------- pageserver/src/bin/pageserver.rs | 1 + 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/pageserver/src/billing_metrics.rs b/pageserver/src/billing_metrics.rs index 73e27618db..3a6b83773d 100644 --- a/pageserver/src/billing_metrics.rs +++ b/pageserver/src/billing_metrics.rs @@ -6,6 +6,7 @@ use anyhow; use tracing::*; +use utils::id::NodeId; use utils::id::TimelineId; use crate::task_mgr; @@ -14,12 +15,14 @@ use pageserver_api::models::TenantState; use utils::id::TenantId; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; use std::fmt; use std::str::FromStr; use std::time::Duration; use chrono::{DateTime, Utc}; +use rand::Rng; use reqwest::Url; /// BillingMetric struct that defines the format for one metric entry @@ -30,27 +33,36 @@ use reqwest::Url; /// "metric": "remote_storage_size", /// "type": "absolute", /// "tenant_id": "5d07d9ce9237c4cd845ea7918c0afa7d", -/// "timeline_id": "00000000000000000000000000000000", -/// "time": ..., +/// "timeline_id": "a03ebb4f5922a1c56ff7485cc8854143", +/// "time": "2022-12-28T11:07:19.317310284Z", +/// "idempotency_key": "2022-12-28 11:07:19.317310324 UTC-1-4019", /// "value": 12345454, /// } /// ``` +#[serde_as] #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] pub struct BillingMetric { pub metric: BillingMetricKind, + #[serde(rename = "type")] pub metric_type: &'static str, + #[serde_as(as = "DisplayFromStr")] pub tenant_id: TenantId, + #[serde_as(as = "Option")] + #[serde(skip_serializing_if = "Option::is_none")] pub timeline_id: Option, pub time: DateTime, + pub idempotency_key: String, pub value: u64, } impl BillingMetric { - pub fn new_absolute( + pub fn new_absolute( metric: BillingMetricKind, tenant_id: TenantId, timeline_id: Option, value: u64, + node_id: NodeId, + rng: &mut R, ) -> Self { Self { metric, @@ -58,6 +70,8 @@ impl BillingMetric { tenant_id, timeline_id, time: Utc::now(), + // key that allows metric collector to distinguish unique events + idempotency_key: format!("{}-{}-{:04}", Utc::now(), node_id, rng.gen_range(0..=9999)), value, } } @@ -123,6 +137,7 @@ struct EventChunk<'a> { pub async fn collect_metrics( metric_collection_endpoint: &Url, metric_collection_interval: Duration, + node_id: NodeId, ) -> anyhow::Result<()> { let mut ticker = tokio::time::interval(metric_collection_interval); @@ -139,7 +154,7 @@ pub async fn collect_metrics( return Ok(()); }, _ = ticker.tick() => { - collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint).await?; + collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await?; } } } @@ -153,6 +168,7 @@ pub async fn collect_metrics_task( client: &reqwest::Client, cached_metrics: &mut HashMap, metric_collection_endpoint: &reqwest::Url, + node_id: NodeId, ) -> anyhow::Result<()> { let mut current_metrics: Vec<(BillingMetricsKey, u64)> = Vec::new(); trace!( @@ -241,15 +257,23 @@ pub async fn collect_metrics_task( for chunk in chunks { chunk_to_send.clear(); - // enrich metrics with timestamp and metric_kind before sending - chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| { - BillingMetric::new_absolute( - curr_key.metric, - curr_key.tenant_id, - curr_key.timeline_id, - *curr_val, - ) - })); + + // this code block is needed to convince compiler + // that rng is not reused aroung await point + { + // enrich metrics with timestamp and metric_kind before sending + let mut rng = rand::thread_rng(); + chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| { + BillingMetric::new_absolute( + curr_key.metric, + curr_key.tenant_id, + curr_key.timeline_id, + *curr_val, + node_id, + &mut rng, + ) + })); + } let chunk_json = serde_json::value::to_raw_value(&EventChunk { events: &chunk_to_send, diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 5246541375..4b71874bdf 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -341,6 +341,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { pageserver::billing_metrics::collect_metrics( metric_collection_endpoint, conf.metric_collection_interval, + conf.id, ) .instrument(info_span!("metrics_collection")) .await?;