Rename billing_metrics to consumption_metrics.

Use more appropriate term, because not all of these metrics are used for billing.
This commit is contained in:
Anastasia Lubennikova
2022-12-29 12:28:58 +02:00
parent c0290467fa
commit 894ac30734
3 changed files with 29 additions and 29 deletions

View File

@@ -338,7 +338,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
"consumption metrics collection",
true,
async move {
pageserver::billing_metrics::collect_metrics(
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.id,

View File

@@ -25,7 +25,7 @@ use chrono::{DateTime, Utc};
use rand::Rng;
use reqwest::Url;
/// BillingMetric struct that defines the format for one metric entry
/// ConsumptionMetric struct that defines the format for one metric entry
/// i.e.
///
/// ```json
@@ -41,8 +41,8 @@ use reqwest::Url;
/// ```
#[serde_as]
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct BillingMetric {
pub metric: BillingMetricKind,
pub struct ConsumptionMetric {
pub metric: ConsumptionMetricKind,
#[serde(rename = "type")]
pub metric_type: &'static str,
#[serde_as(as = "DisplayFromStr")]
@@ -55,9 +55,9 @@ pub struct BillingMetric {
pub value: u64,
}
impl BillingMetric {
impl ConsumptionMetric {
pub fn new_absolute<R: Rng + ?Sized>(
metric: BillingMetricKind,
metric: ConsumptionMetricKind,
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
value: u64,
@@ -79,7 +79,7 @@ impl BillingMetric {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum BillingMetricKind {
pub enum ConsumptionMetricKind {
/// Amount of WAL produced , by a timeline, i.e. last_record_lsn
/// This is an absolute, per-timeline metric.
WrittenSize,
@@ -96,7 +96,7 @@ pub enum BillingMetricKind {
RemoteStorageSize,
}
impl FromStr for BillingMetricKind {
impl FromStr for ConsumptionMetricKind {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
@@ -110,27 +110,27 @@ impl FromStr for BillingMetricKind {
}
}
impl fmt::Display for BillingMetricKind {
impl fmt::Display for ConsumptionMetricKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
BillingMetricKind::WrittenSize => "written_size",
BillingMetricKind::SyntheticStorageSize => "synthetic_storage_size",
BillingMetricKind::ResidentSize => "resident_size",
BillingMetricKind::RemoteStorageSize => "remote_storage_size",
ConsumptionMetricKind::WrittenSize => "written_size",
ConsumptionMetricKind::SyntheticStorageSize => "synthetic_storage_size",
ConsumptionMetricKind::ResidentSize => "resident_size",
ConsumptionMetricKind::RemoteStorageSize => "remote_storage_size",
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BillingMetricsKey {
pub struct ConsumptionMetricsKey {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
metric: BillingMetricKind,
metric: ConsumptionMetricKind,
}
#[derive(serde::Serialize)]
struct EventChunk<'a> {
events: &'a [BillingMetric],
events: &'a [ConsumptionMetric],
}
/// Main thread that serves metrics collection
@@ -145,7 +145,7 @@ pub async fn collect_metrics(
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let mut cached_metrics: HashMap<BillingMetricsKey, u64> = HashMap::new();
let mut cached_metrics: HashMap<ConsumptionMetricsKey, u64> = HashMap::new();
loop {
tokio::select! {
@@ -166,11 +166,11 @@ pub async fn collect_metrics(
/// Cache metrics to avoid sending the same metrics multiple times.
pub async fn collect_metrics_task(
client: &reqwest::Client,
cached_metrics: &mut HashMap<BillingMetricsKey, u64>,
cached_metrics: &mut HashMap<ConsumptionMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
) -> anyhow::Result<()> {
let mut current_metrics: Vec<(BillingMetricsKey, u64)> = Vec::new();
let mut current_metrics: Vec<(ConsumptionMetricsKey, u64)> = Vec::new();
trace!(
"starting collect_metrics_task. metric_collection_endpoint: {}",
metric_collection_endpoint
@@ -194,10 +194,10 @@ pub async fn collect_metrics_task(
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
current_metrics.push((
BillingMetricsKey {
ConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: BillingMetricKind::WrittenSize,
metric: ConsumptionMetricKind::WrittenSize,
},
timeline_written_size,
));
@@ -217,19 +217,19 @@ pub async fn collect_metrics_task(
);
current_metrics.push((
BillingMetricsKey {
ConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: BillingMetricKind::ResidentSize,
metric: ConsumptionMetricKind::ResidentSize,
},
tenant_resident_size,
));
current_metrics.push((
BillingMetricsKey {
ConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: BillingMetricKind::RemoteStorageSize,
metric: ConsumptionMetricKind::RemoteStorageSize,
},
tenant_remote_size,
));
@@ -253,7 +253,7 @@ pub async fn collect_metrics_task(
const CHUNK_SIZE: usize = 1000;
let chunks = current_metrics.chunks(CHUNK_SIZE);
let mut chunk_to_send: Vec<BillingMetric> = Vec::with_capacity(1000);
let mut chunk_to_send: Vec<ConsumptionMetric> = Vec::with_capacity(1000);
for chunk in chunks {
chunk_to_send.clear();
@@ -264,7 +264,7 @@ pub async fn collect_metrics_task(
// enrich metrics with timestamp and metric_kind before sending
let mut rng = rand::thread_rng();
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| {
BillingMetric::new_absolute(
ConsumptionMetric::new_absolute(
curr_key.metric,
curr_key.tenant_id,
curr_key.timeline_id,
@@ -278,7 +278,7 @@ pub async fn collect_metrics_task(
let chunk_json = serde_json::value::to_raw_value(&EventChunk {
events: &chunk_to_send,
})
.expect("BillingMetric should not fail serialization");
.expect("ConsumptionMetric should not fail serialization");
let res = client
.post(metric_collection_endpoint.clone())

View File

@@ -1,7 +1,7 @@
mod auth;
pub mod basebackup;
pub mod billing_metrics;
pub mod config;
pub mod consumption_metrics;
pub mod http;
pub mod import_datadir;
pub mod keyspace;