Fix serialization of billing metrics (#3215)

Fixes:
- serialize TenantId and TimelineId as strings,
- skip TimelineId if none
- serialize `metric_type` field as `type`
- add `idempotency_key` field to uniquely identify metrics
This commit is contained in:
Anastasia Lubennikova
2022-12-29 12:11:04 +02:00
committed by GitHub
parent bd7a9e6274
commit f731e9b3de
2 changed files with 38 additions and 13 deletions

View File

@@ -6,6 +6,7 @@
use anyhow;
use tracing::*;
use utils::id::NodeId;
use utils::id::TimelineId;
use crate::task_mgr;
@@ -14,12 +15,14 @@ use pageserver_api::models::TenantState;
use utils::id::TenantId;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use rand::Rng;
use reqwest::Url;
/// BillingMetric struct that defines the format for one metric entry
@@ -30,27 +33,36 @@ use reqwest::Url;
/// "metric": "remote_storage_size",
/// "type": "absolute",
/// "tenant_id": "5d07d9ce9237c4cd845ea7918c0afa7d",
/// "timeline_id": "00000000000000000000000000000000",
/// "time": ...,
/// "timeline_id": "a03ebb4f5922a1c56ff7485cc8854143",
/// "time": "2022-12-28T11:07:19.317310284Z",
/// "idempotency_key": "2022-12-28 11:07:19.317310324 UTC-1-4019",
/// "value": 12345454,
/// }
/// ```
#[serde_as]
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct BillingMetric {
pub metric: BillingMetricKind,
#[serde(rename = "type")]
pub metric_type: &'static str,
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub timeline_id: Option<TimelineId>,
pub time: DateTime<Utc>,
pub idempotency_key: String,
pub value: u64,
}
impl BillingMetric {
pub fn new_absolute(
pub fn new_absolute<R: Rng + ?Sized>(
metric: BillingMetricKind,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
value: u64,
node_id: NodeId,
rng: &mut R,
) -> Self {
Self {
metric,
@@ -58,6 +70,8 @@ impl BillingMetric {
tenant_id,
timeline_id,
time: Utc::now(),
// key that allows metric collector to distinguish unique events
idempotency_key: format!("{}-{}-{:04}", Utc::now(), node_id, rng.gen_range(0..=9999)),
value,
}
}
@@ -123,6 +137,7 @@ struct EventChunk<'a> {
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
node_id: NodeId,
) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval);
@@ -139,7 +154,7 @@ pub async fn collect_metrics(
return Ok(());
},
_ = ticker.tick() => {
collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint).await?;
collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await?;
}
}
}
@@ -153,6 +168,7 @@ pub async fn collect_metrics_task(
client: &reqwest::Client,
cached_metrics: &mut HashMap<BillingMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
) -> anyhow::Result<()> {
let mut current_metrics: Vec<(BillingMetricsKey, u64)> = Vec::new();
trace!(
@@ -241,15 +257,23 @@ pub async fn collect_metrics_task(
for chunk in chunks {
chunk_to_send.clear();
// enrich metrics with timestamp and metric_kind before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| {
BillingMetric::new_absolute(
curr_key.metric,
curr_key.tenant_id,
curr_key.timeline_id,
*curr_val,
)
}));
// this code block is needed to convince compiler
// that rng is not reused aroung await point
{
// enrich metrics with timestamp and metric_kind before sending
let mut rng = rand::thread_rng();
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| {
BillingMetric::new_absolute(
curr_key.metric,
curr_key.tenant_id,
curr_key.timeline_id,
*curr_val,
node_id,
&mut rng,
)
}));
}
let chunk_json = serde_json::value::to_raw_value(&EventChunk {
events: &chunk_to_send,

View File

@@ -341,6 +341,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
pageserver::billing_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.id,
)
.instrument(info_span!("metrics_collection"))
.await?;