consumption_metrics: send timeline_written_size_delta (#4822)

We want to have timeline_written_size_delta which is defined as
difference to the previously sent `timeline_written_size` from the
current `timeline_written_size`.

Solution is to send it. On the first round `disk_consistent_lsn` is used
which is captured during `load` time. After that an incremental "event"
is sent on every collection. Incremental "events" are not part of
deduplication.

I've added some infrastructure to allow somewhat typesafe
`EventType::Absolute` and `EventType::Incremental` factories per
metrics, now that we have our first `EventType::Incremental` usage.
This commit is contained in:
Joonas Koivunen
2023-07-31 22:10:19 +03:00
committed by GitHub
parent ddbe170454
commit 326189d950
3 changed files with 243 additions and 54 deletions

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use rand::Rng;
use serde::Serialize;
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]
@@ -17,6 +17,32 @@ pub enum EventType {
},
}
impl EventType {
pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
use EventType::*;
match self {
Absolute { time } => Some(time),
_ => None,
}
}
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
// these can most likely be thought of as Range or RangeFull
use EventType::*;
match self {
Incremental {
start_time,
stop_time,
} => Some(start_time..stop_time),
_ => None,
}
}
pub fn is_incremental(&self) -> bool {
matches!(self, EventType::Incremental { .. })
}
}
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra> {
#[serde(flatten)]

View File

@@ -7,7 +7,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use anyhow;
use chrono::Utc;
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use pageserver_api::models::TenantState;
use reqwest::Url;
@@ -18,12 +18,6 @@ use std::time::Duration;
use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
const WRITTEN_SIZE: &str = "written_size";
const SYNTHETIC_STORAGE_SIZE: &str = "synthetic_storage_size";
const RESIDENT_SIZE: &str = "resident_size";
const REMOTE_STORAGE_SIZE: &str = "remote_storage_size";
const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size";
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
@@ -44,6 +38,121 @@ pub struct PageserverConsumptionMetricsKey {
pub metric: &'static str,
}
impl PageserverConsumptionMetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
const fn incremental_values(self) -> IncrementalValueFactory {
IncrementalValueFactory(self)
}
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(PageserverConsumptionMetricsKey);
impl AbsoluteValueFactory {
fn now(self, val: u64) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
let key = self.0;
let time = Utc::now();
(key, (EventType::Absolute { time }, val))
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(PageserverConsumptionMetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
fn from_previous_up_to(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
(
key,
(
EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
},
val,
),
)
}
fn key(&self) -> &PageserverConsumptionMetricsKey {
&self.0
}
}
// the static part of a PageserverConsumptionMetricsKey
impl PageserverConsumptionMetricsKey {
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size",
}
.absolute_values()
}
/// Values will be the difference of the latest written_size (last_record_lsn) to what we
/// previously sent.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size_bytes_delta",
}
.incremental_values()
}
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "timeline_logical_size",
}
.absolute_values()
}
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: "remote_storage_size",
}
.absolute_values()
}
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: "resident_size",
}
.absolute_values()
}
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: "synthetic_storage_size",
}
.absolute_values()
}
}
/// Main thread that serves metrics collection
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
@@ -79,7 +188,7 @@ pub async fn collect_metrics(
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
let mut cached_metrics = HashMap::new();
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
loop {
@@ -121,13 +230,13 @@ pub async fn collect_metrics(
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
pub async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, (EventType, u64)>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, (EventType, u64))> = Vec::new();
trace!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
@@ -166,27 +275,80 @@ pub async fn collect_metrics_iteration(
if timeline.is_active() {
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: WRITTEN_SIZE,
let (key, written_size_now) =
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline.timeline_id)
.now(timeline_written_size);
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
// features might change this.
let written_size_delta_key = PageserverConsumptionMetricsKey::written_size_delta(
tenant_id,
timeline.timeline_id,
);
// use this when available, because in a stream of incremental values, it will be
// accurate where as when last_record_lsn stops moving, we will only cache the last
// one of those.
let last_stop_time =
cached_metrics
.get(written_size_delta_key.key())
.map(|(until, _val)| {
until
.incremental_timerange()
.expect("never create EventType::Absolute for written_size_delta")
.end
});
// by default, use the last sent written_size as the basis for
// calculating the delta. if we don't yet have one, use the load time value.
let prev = cached_metrics
.get(&key)
.map(|(prev_at, prev)| {
// use the prev time from our last incremental update, or default to latest
// absolute update on the first round.
let prev_at = prev_at
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let prev_at = last_stop_time.unwrap_or(prev_at);
(*prev_at, *prev)
})
.unwrap_or_else(|| {
// if we don't have a previous point of comparison, compare to the load time
// lsn.
let (disk_consistent_lsn, loaded_at) = &timeline.loaded_at;
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
// written_size_delta_bytes
current_metrics.extend(
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let up_to = written_size_now
.0
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let key_value =
written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
Some(key_value)
} else {
None
},
timeline_written_size,
));
);
// written_size
current_metrics.push((key, written_size_now));
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
// Only send timeline logical size when it is fully calculated.
Ok((size, is_exact)) if is_exact => {
current_metrics.push((
PageserverConsumptionMetricsKey {
current_metrics.push(
PageserverConsumptionMetricsKey::timeline_logical_size(
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: TIMELINE_LOGICAL_SIZE,
},
size,
));
timeline.timeline_id,
)
.now(size),
);
}
Ok((_, _)) => {}
Err(err) => {
@@ -205,14 +367,10 @@ pub async fn collect_metrics_iteration(
match tenant.get_remote_size().await {
Ok(tenant_remote_size) => {
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: REMOTE_STORAGE_SIZE,
},
tenant_remote_size,
));
current_metrics.push(
PageserverConsumptionMetricsKey::remote_storage_size(tenant_id)
.now(tenant_remote_size),
);
}
Err(err) => {
error!(
@@ -222,14 +380,9 @@ pub async fn collect_metrics_iteration(
}
}
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: RESIDENT_SIZE,
},
tenant_resident_size,
));
current_metrics.push(
PageserverConsumptionMetricsKey::resident_size(tenant_id).now(tenant_resident_size),
);
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
@@ -237,23 +390,27 @@ pub async fn collect_metrics_iteration(
if tenant_synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: SYNTHETIC_STORAGE_SIZE,
},
tenant_synthetic_size,
));
current_metrics.push(
PageserverConsumptionMetricsKey::synthetic_size(tenant_id)
.now(tenant_synthetic_size),
);
}
}
// Filter metrics, unless we want to send all metrics, including cached ones.
// See: https://github.com/neondatabase/neon/issues/3485
if !send_cached {
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
Some(val) => val != curr_val,
None => true,
current_metrics.retain(|(curr_key, (kind, curr_val))| {
if kind.is_incremental() {
// incremental values (currently only written_size_delta) should not get any cache
// deduplication because they will be used by upstream for "is still alive."
true
} else {
match cached_metrics.get(curr_key) {
Some((_, val)) => val != curr_val,
None => true,
}
}
});
}
@@ -272,8 +429,8 @@ pub async fn collect_metrics_iteration(
chunk_to_send.clear();
// enrich metrics with type,timestamp and idempotency key before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| Event {
kind: EventType::Absolute { time: Utc::now() },
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
idempotency_key: idempotency_key(node_id.to_string()),
value: *curr_val,

View File

@@ -294,6 +294,10 @@ pub struct Timeline {
/// Completion shared between all timelines loaded during startup; used to delay heavier
/// background tasks until some logical sizes have been calculated.
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
/// Load or creation time information about the disk_consistent_lsn and when the loading
/// happened. Used for consumption metrics.
pub(crate) loaded_at: (Lsn, SystemTime),
}
pub struct WalReceiverInfo {
@@ -1404,6 +1408,8 @@ impl Timeline {
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_ts: RwLock::new(Instant::now()),
loaded_at: (disk_consistent_lsn, SystemTime::now()),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),