refactor: tidy consumption metrics (#4860)

Tidying up I've been wanting to do for some time.

Follow-up to #4857.
This commit is contained in:
Joonas Koivunen
2023-08-01 18:14:16 +03:00
committed by GitHub
parent 78fa2b13e5
commit 3a00a5deb2
5 changed files with 102 additions and 117 deletions

View File

@@ -57,7 +57,7 @@ pub struct Event<Extra> {
pub extra: Extra,
}
pub fn idempotency_key(node_id: String) -> String {
pub fn idempotency_key(node_id: &str) -> String {
format!(
"{}-{}-{:04}",
Utc::now(),
@@ -71,6 +71,6 @@ pub const CHUNK_SIZE: usize = 1000;
// Just a wrapper around a slice of events
// to serialize it as `{"events" : [ ] }
#[derive(serde::Serialize)]
pub struct EventChunk<'a, T> {
pub events: &'a [T],
pub struct EventChunk<'a, T: Clone> {
pub events: std::borrow::Cow<'a, [T]>,
}

View File

@@ -23,7 +23,7 @@ use utils::lsn::Lsn;
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone, Copy)]
struct Ids {
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
@@ -34,13 +34,13 @@ struct Ids {
/// Key that uniquely identifies the object, this metric describes.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageserverConsumptionMetricsKey {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub metric: &'static str,
struct MetricsKey {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
metric: &'static str,
}
impl PageserverConsumptionMetricsKey {
impl MetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
@@ -50,21 +50,17 @@ impl PageserverConsumptionMetricsKey {
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(PageserverConsumptionMetricsKey);
struct AbsoluteValueFactory(MetricsKey);
impl AbsoluteValueFactory {
fn at(
self,
time: DateTime<Utc>,
val: u64,
) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
fn at(self, time: DateTime<Utc>, val: u64) -> (MetricsKey, (EventType, u64)) {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(PageserverConsumptionMetricsKey);
struct IncrementalValueFactory(MetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
@@ -73,7 +69,7 @@ impl IncrementalValueFactory {
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
) -> (MetricsKey, (EventType, u64)) {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
(
@@ -88,15 +84,18 @@ impl IncrementalValueFactory {
)
}
fn key(&self) -> &PageserverConsumptionMetricsKey {
fn key(&self) -> &MetricsKey {
&self.0
}
}
// the static part of a PageserverConsumptionMetricsKey
impl PageserverConsumptionMetricsKey {
// the static part of a MetricsKey
impl MetricsKey {
/// Absolute value of [`Timeline::get_last_record_lsn`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size",
@@ -104,13 +103,14 @@ impl PageserverConsumptionMetricsKey {
.absolute_values()
}
/// Values will be the difference of the latest written_size (last_record_lsn) to what we
/// previously sent.
/// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
/// previously sent, starting from the previously sent incremental time range ending at the
/// latest absolute measurement.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
PageserverConsumptionMetricsKey {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size_bytes_delta",
@@ -118,11 +118,14 @@ impl PageserverConsumptionMetricsKey {
.incremental_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "timeline_logical_size",
@@ -130,8 +133,11 @@ impl PageserverConsumptionMetricsKey {
.absolute_values()
}
/// [`Tenant::remote_size`]
///
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "remote_storage_size",
@@ -139,8 +145,11 @@ impl PageserverConsumptionMetricsKey {
.absolute_values()
}
/// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
///
/// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "resident_size",
@@ -148,8 +157,11 @@ impl PageserverConsumptionMetricsKey {
.absolute_values()
}
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "synthetic_storage_size",
@@ -233,15 +245,15 @@ pub async fn collect_metrics(
///
/// TODO
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
pub async fn collect_metrics_iteration(
async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, (EventType, u64)>,
cached_metrics: &mut HashMap<MetricsKey, (EventType, u64)>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, (EventType, u64))> = Vec::new();
let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
trace!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
@@ -300,40 +312,23 @@ pub async fn collect_metrics_iteration(
}
}
let timeline_resident_size = timeline.get_resident_physical_size();
tenant_resident_size += timeline_resident_size;
tenant_resident_size += timeline.resident_physical_size();
}
match tenant.get_remote_size().await {
Ok(tenant_remote_size) => {
current_metrics.push(
PageserverConsumptionMetricsKey::remote_storage_size(tenant_id)
.at(Utc::now(), tenant_remote_size),
);
}
Err(err) => {
error!(
"failed to get remote size for tenant {}: {err:?}",
tenant_id
);
}
}
current_metrics
.push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size()));
current_metrics.push(
PageserverConsumptionMetricsKey::resident_size(tenant_id)
.at(Utc::now(), tenant_resident_size),
);
current_metrics
.push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size));
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
let tenant_synthetic_size = tenant.get_cached_synthetic_size();
let synthetic_size = tenant.cached_synthetic_size();
if tenant_synthetic_size != 0 {
if synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
current_metrics.push(
PageserverConsumptionMetricsKey::synthetic_size(tenant_id)
.at(Utc::now(), tenant_synthetic_size),
);
current_metrics
.push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size));
}
}
@@ -365,6 +360,8 @@ pub async fn collect_metrics_iteration(
let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
let node_id = node_id.to_string();
for chunk in chunks {
chunk_to_send.clear();
@@ -372,7 +369,7 @@ pub async fn collect_metrics_iteration(
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
idempotency_key: idempotency_key(node_id.to_string()),
idempotency_key: idempotency_key(&node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
@@ -380,17 +377,14 @@ pub async fn collect_metrics_iteration(
},
}));
let chunk_json = serde_json::value::to_raw_value(&EventChunk {
events: &chunk_to_send,
})
.expect("PageserverConsumptionMetric should not fail serialization");
const MAX_RETRIES: u32 = 3;
for attempt in 0..MAX_RETRIES {
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk_json)
.json(&EventChunk {
events: (&chunk_to_send).into(),
})
.send()
.await;
@@ -482,20 +476,18 @@ impl TimelineSnapshot {
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
metrics: &mut Vec<(PageserverConsumptionMetricsKey, (EventType, u64))>,
cache: &HashMap<PageserverConsumptionMetricsKey, (EventType, u64)>,
metrics: &mut Vec<(MetricsKey, (EventType, u64))>,
cache: &HashMap<MetricsKey, (EventType, u64)>,
) {
let timeline_written_size = u64::from(self.last_record_lsn);
let (key, written_size_now) =
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id)
.at(now, timeline_written_size);
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
// features might change this.
let written_size_delta_key =
PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id);
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
// use this when available, because in a stream of incremental values, it will be
// accurate where as when last_record_lsn stops moving, we will only cache the last
@@ -547,10 +539,7 @@ impl TimelineSnapshot {
metrics.push((key, written_size_now));
if let Some(size) = self.current_exact_logical_size {
metrics.push(
PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id)
.at(now, size),
);
metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size));
}
}
}
@@ -569,7 +558,7 @@ pub async fn calculate_synthetic_size_worker(
_ = task_mgr::shutdown_watcher() => {
return Ok(());
},
tick_at = ticker.tick() => {
tick_at = ticker.tick() => {
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
@@ -616,7 +605,7 @@ mod tests {
lsn::Lsn,
};
use crate::consumption_metrics::PageserverConsumptionMetricsKey;
use crate::consumption_metrics::MetricsKey;
use super::TimelineSnapshot;
use chrono::{DateTime, Utc};
@@ -645,12 +634,13 @@ mod tests {
assert_eq!(
metrics,
&[
PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(snap.loaded_at.1.into(), now, 0),
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id)
.at(now, 0x42000)
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -669,11 +659,9 @@ mod tests {
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache =
HashMap::from([
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id)
.at(before, disk_consistent_lsn.0),
]);
let cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
@@ -686,12 +674,10 @@ mod tests {
assert_eq!(
metrics,
&[
PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id)
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id)
.at(now, 0x42000)
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -713,11 +699,13 @@ mod tests {
let mut metrics = Vec::new();
let cache = HashMap::from([
// at t=before was the last time the last_record_lsn changed
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id)
.at(before, disk_consistent_lsn.0),
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
// end time of this event is used for the next ones
PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, just_before, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before,
just_before,
0,
),
]);
let snap = TimelineSnapshot {
@@ -731,12 +719,13 @@ mod tests {
assert_eq!(
metrics,
&[
PageserverConsumptionMetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(just_before, now, 0),
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
PageserverConsumptionMetricsKey::timeline_logical_size(tenant_id, timeline_id)
.at(now, 0x42000)
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
just_before,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}

View File

@@ -644,20 +644,19 @@ impl Tenant {
Ok(())
}
/// get size of all remote timelines
/// Get sum of all remote timelines sizes
///
/// This function relies on the index_part instead of listing the remote storage
///
pub async fn get_remote_size(&self) -> anyhow::Result<u64> {
pub fn remote_size(&self) -> u64 {
let mut size = 0;
for timeline in self.list_timelines().iter() {
for timeline in self.list_timelines() {
if let Some(remote_client) = &timeline.remote_client {
size += remote_client.get_remote_physical_size();
}
}
Ok(size)
size
}
#[instrument(skip_all, fields(timeline_id=%timeline_id))]
@@ -2889,7 +2888,7 @@ impl Tenant {
.set(size);
}
pub fn get_cached_synthetic_size(&self) -> u64 {
pub fn cached_synthetic_size(&self) -> u64 {
self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
}
}

View File

@@ -528,7 +528,7 @@ impl Timeline {
size
}
pub fn get_resident_physical_size(&self) -> u64 {
pub fn resident_physical_size(&self) -> u64 {
self.metrics.resident_physical_size_gauge.get()
}

View File

@@ -11,7 +11,6 @@ const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
///
/// Key that uniquely identifies the object, this metric describes.
/// Currently, endpoint_id is enough, but this may change later,
/// so keep it in a named struct.
@@ -19,8 +18,7 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
/// Both the proxy and the ingestion endpoint will live in the same region (or cell)
/// so while the project-id is unique across regions the whole pipeline will work correctly
/// because we enrich the event with project_id in the control-plane endpoint.
///
#[derive(Eq, Hash, PartialEq, Serialize, Debug)]
#[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)]
pub struct Ids {
pub endpoint_id: String,
pub branch_id: String,
@@ -149,7 +147,7 @@ async fn collect_metrics_iteration(
stop_time: *curr_time,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname.to_owned()),
idempotency_key: idempotency_key(hostname),
value,
extra: Ids {
endpoint_id: curr_key.endpoint_id.clone(),
@@ -167,12 +165,11 @@ async fn collect_metrics_iteration(
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
let chunk_json = serde_json::value::to_raw_value(&EventChunk { events: chunk })
.expect("ProxyConsumptionMetric should not fail serialization");
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk_json)
.json(&EventChunk {
events: chunk.into(),
})
.send()
.await;