fix: consumption metrics on restart (#5323)

Write collected metrics to disk to recover previously sent metrics on
restart.

Recover the previously collected metrics during startup, send them over
at right time
  - send cached synthetic size before actual is calculated
  - when `last_record_lsn` rolls back on startup
      - stay at last sent `written_size` metric
      - send `written_size_delta_bytes` metric as 0

Add test support: stateful verification of events in python tests.

Fixes: #5206
Cc: #5175 (loggings, will be enhanced in follow-up)
This commit is contained in:
Joonas Koivunen
2023-09-16 11:24:42 +03:00
committed by GitHub
parent a7f4ee02a3
commit f902777202
5 changed files with 757 additions and 189 deletions

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use rand::Rng;
use serde::Serialize;
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]

View File

@@ -80,11 +80,11 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tempfile.workspace = true
[dev-dependencies]
criterion.workspace = true
hex-literal.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
[[bench]]

View File

@@ -518,6 +518,9 @@ fn start_pageserver(
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
@@ -544,6 +547,7 @@ fn start_pageserver(
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))

View File

@@ -14,6 +14,7 @@ use reqwest::Url;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio_util::sync::CancellationToken;
@@ -21,10 +22,12 @@ use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use anyhow::Context;
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
#[derive(Serialize, Debug, Clone, Copy)]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)]
struct Ids {
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
@@ -64,11 +67,17 @@ enum Name {
///
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
/// elsewhere.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
timeline_id: Option<TimelineId>,
metric: &'static str,
metric: Name,
}
impl MetricsKey {
@@ -88,6 +97,10 @@ impl AbsoluteValueFactory {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
@@ -129,7 +142,7 @@ impl MetricsKey {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size",
metric: Name::WrittenSize,
}
.absolute_values()
}
@@ -144,9 +157,7 @@ impl MetricsKey {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
// the name here is correctly about data not size, because that is what is wanted by
// downstream pipeline
metric: "written_data_bytes_delta",
metric: Name::WrittenSizeDelta,
}
.incremental_values()
}
@@ -161,7 +172,7 @@ impl MetricsKey {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "timeline_logical_size",
metric: Name::LogicalSize,
}
.absolute_values()
}
@@ -173,7 +184,7 @@ impl MetricsKey {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "remote_storage_size",
metric: Name::RemoteSize,
}
.absolute_values()
}
@@ -185,7 +196,7 @@ impl MetricsKey {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "resident_size",
metric: Name::ResidentSize,
}
.absolute_values()
}
@@ -197,7 +208,7 @@ impl MetricsKey {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "synthetic_storage_size",
metric: Name::SyntheticSize,
}
.absolute_values()
}
@@ -224,6 +235,7 @@ pub async fn collect_metrics(
_cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
local_disk_storage: PathBuf,
ctx: RequestContext,
) -> anyhow::Result<()> {
if _cached_metric_collection_interval != Duration::ZERO {
@@ -231,8 +243,6 @@ pub async fn collect_metrics(
"cached_metric_collection_interval is no longer used, please set it to zero."
)
}
let mut ticker = tokio::time::interval(metric_collection_interval);
info!("starting collect_metrics");
// spin up background worker that caclulates tenant sizes
let worker_ctx =
@@ -252,61 +262,183 @@ pub async fn collect_metrics(
},
);
let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);
// define client here to reuse it for all requests
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");
let mut cached_metrics = HashMap::new();
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("collect_metrics received cancellation request");
return Ok(());
},
tick_at = ticker.tick() => {
let node_id = node_id.to_string();
let cancel = task_mgr::shutdown_token();
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx).await;
let (mut cached_metrics, oldest_metric_captured_at) =
match read_metrics_from_disk(final_path.clone()).await {
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let oldest_metric_captured_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
let cached = found_some
.into_iter()
.collect::<HashMap<MetricsKey, (EventType, u64)>>();
(cached, oldest_metric_captured_at)
}
Err(e) => {
let root = e.root_cause();
let maybe_ioerr = root.downcast_ref::<std::io::Error>();
let is_not_found =
maybe_ioerr.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound);
if !is_not_found {
tracing::info!(
"failed to read any previous metrics from {final_path:?}: {e:#}"
);
}
(HashMap::new(), None)
}
};
if let Some(oldest_metric_captured_at) = oldest_metric_captured_at {
// FIXME: chrono methods panic
let oldest_metric_captured_at: SystemTime = oldest_metric_captured_at.into();
let now = SystemTime::now();
let error = match now.duration_since(oldest_metric_captured_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;
let deadline = std::time::Instant::now() + sleep_for;
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = tokio::time::sleep_until(deadline.into()) => {},
}
let now = std::time::Instant::now();
// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?oldest_metric_captured_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
oldest_metric_captured_at.duration_since(now).ok()
}
};
if let Some(error) = error {
if error.as_secs() >= 60 {
tracing::info!(
error_ms = error.as_millis(),
"startup scheduling error due to restart"
)
}
}
}
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
iteration(
&client,
metric_collection_endpoint,
&cancel,
&mut cached_metrics,
&node_id,
&final_path,
&ctx,
)
.await;
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
/// One iteration of metrics collection
///
/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
/// Cache metrics to avoid sending the same metrics multiple times.
///
/// This function handles all errors internally
/// and doesn't break iteration if just one tenant fails.
///
/// TODO
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
async fn collect_metrics_iteration(
async fn iteration(
client: &reqwest::Client,
cached_metrics: &mut Cache,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
cancel: &CancellationToken,
cached_metrics: &mut Cache,
node_id: &str,
final_path: &Arc<PathBuf>,
ctx: &RequestContext,
) {
trace!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
// these are point in time, with variable "now"
let metrics = collect_all_metrics(cached_metrics, ctx).await;
if metrics.is_empty() {
return;
}
let metrics = Arc::new(metrics);
let flush = async {
match flush_metrics_to_disk(&metrics, final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(e) => {
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
}
}
};
let upload = async {
let res = upload_metrics(
client,
metric_collection_endpoint,
cancel,
node_id,
&metrics,
cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
}
};
// let these run concurrently
let (_, _) = tokio::join!(flush, upload);
}
async fn collect_all_metrics(cached_metrics: &Cache, ctx: &RequestContext) -> Vec<RawMetric> {
let started_at = std::time::Instant::now();
// get list of tenants
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(err) => {
error!("failed to list tenants: {:?}", err);
return;
return vec![];
}
};
@@ -321,85 +453,29 @@ async fn collect_metrics_iteration(
}
});
let current_metrics = collect(tenants, cached_metrics, ctx).await;
let res = collect(tenants, cached_metrics, ctx).await;
if current_metrics.is_empty() {
trace!("no new metrics to send");
return;
}
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
total = res.len(),
"collected metrics"
);
let cancel = task_mgr::shutdown_token();
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
let chunks = current_metrics.chunks(CHUNK_SIZE);
let mut chunk_to_send: Vec<Event<Ids, &'static str>> = Vec::with_capacity(CHUNK_SIZE);
let node_id = node_id.to_string();
let mut buffer = bytes::BytesMut::new();
for chunk in chunks {
chunk_to_send.clear();
// enrich metrics with type,timestamp and idempotency key before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
idempotency_key: idempotency_key(&node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
use bytes::BufMut;
// FIXME: this is a new panic we did not have before. not really essential, but panics from
// this task currently restart pageserver.
serde_json::to_writer(
(&mut buffer).writer(),
&EventChunk {
events: (&chunk_to_send).into(),
},
)
.expect("serialization must not fail and bytesmut grows");
let body = buffer.split().freeze();
let res = upload(client, metric_collection_endpoint, body, &cancel).await;
if res.is_ok() {
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
// no need to log, backoff::retry and upload have done it, just give up uploading.
}
}
res
}
async fn collect<S>(
tenants: S,
cache: &HashMap<MetricsKey, (EventType, u64)>,
ctx: &RequestContext,
) -> Vec<(MetricsKey, (EventType, u64))>
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
{
let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
let mut current_metrics: Vec<RawMetric> = Vec::new();
let mut tenants = std::pin::pin!(tenants);
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines() {
// collect per-timeline metrics only for active timelines
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
@@ -425,16 +501,158 @@ where
tenant_resident_size += timeline.resident_physical_size();
}
TenantSnapshot::collect(&tenant, tenant_resident_size).to_metrics(
tenant_id,
Utc::now(),
&mut current_metrics,
);
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}
current_metrics
}
async fn flush_metrics_to_disk(
current_metrics: &Arc<Vec<(MetricsKey, (EventType, u64))>>,
final_path: &Arc<PathBuf>,
) -> anyhow::Result<()> {
use std::io::Write;
anyhow::ensure!(
final_path.parent().is_some(),
"path must have parent: {final_path:?}"
);
let span = tracing::Span::current();
tokio::task::spawn_blocking({
let current_metrics = current_metrics.clone();
let final_path = final_path.clone();
move || {
let _e = span.entered();
let mut tempfile =
tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?;
// write out all of the raw metrics, to be read out later on restart as cached values
{
let mut writer = std::io::BufWriter::new(&mut tempfile);
serde_json::to_writer(&mut writer, &*current_metrics)
.context("serialize metrics")?;
writer
.into_inner()
.map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
}
tempfile.flush()?;
tempfile.as_file().sync_all()?;
drop(tempfile.persist(&*final_path)?);
let f = std::fs::File::open(final_path.parent().unwrap())?;
f.sync_all()?;
anyhow::Ok(())
}
})
.await
.with_context(|| format!("write metrics to {final_path:?} join error"))
.and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}")))
}
async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result<Vec<RawMetric>> {
// do not add context to each error, callsite will log with full path
let span = tracing::Span::current();
tokio::task::spawn_blocking(move || {
let _e = span.entered();
let mut file = std::fs::File::open(&*path)?;
let reader = std::io::BufReader::new(&mut file);
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
})
.await
.context("read metrics join error")
.and_then(|x| x)
}
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
async fn upload_metrics(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
node_id: &str,
metrics: &[RawMetric],
cached_metrics: &mut Cache,
) -> anyhow::Result<()> {
use bytes::BufMut;
let mut uploaded = 0;
let mut failed = 0;
let started_at = std::time::Instant::now();
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
let mut buffer = bytes::BytesMut::new();
let mut chunk_to_send = Vec::new();
for chunk in metrics.chunks(CHUNK_SIZE) {
chunk_to_send.clear();
// FIXME: this should always overwrite and truncate to chunk.len()
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
// FIXME: finally write! this to the prev allocation
idempotency_key: idempotency_key(node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
serde_json::to_writer(
(&mut buffer).writer(),
&EventChunk {
events: (&chunk_to_send).into(),
},
)?;
let body = buffer.split().freeze();
let event_bytes = body.len();
let res = upload(client, metric_collection_endpoint, body, cancel)
.instrument(tracing::info_span!(
"upload",
%event_bytes,
uploaded,
total = metrics.len(),
))
.await;
match res {
Ok(()) => {
for (curr_key, curr_val) in chunk {
cached_metrics.insert(*curr_key, *curr_val);
}
uploaded += chunk.len();
}
Err(_) => {
// failure(s) have already been logged
//
// however this is an inconsistency: if we crash here, we will start with the
// values as uploaded. in practice, the rejections no longer happen.
failed += chunk.len();
}
}
}
let elapsed = started_at.elapsed();
tracing::info!(
uploaded,
failed,
elapsed_ms = elapsed.as_millis(),
"done sending metrics"
);
Ok(())
}
enum UploadError {
Rejected(reqwest::StatusCode),
Reqwest(reqwest::Error),
@@ -545,16 +763,34 @@ impl TenantSnapshot {
}
}
fn to_metrics(&self, tenant_id: TenantId, now: DateTime<Utc>, metrics: &mut Vec<RawMetric>) {
fn to_metrics(
&self,
tenant_id: TenantId,
now: DateTime<Utc>,
cached: &Cache,
metrics: &mut Vec<RawMetric>,
) {
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
let synthetic_size = if self.synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
Some(MetricsKey::synthetic_size(tenant_id).at(now, self.synthetic_size))
} else {
None
let synthetic_size = {
let factory = MetricsKey::synthetic_size(tenant_id);
let mut synthetic_size = self.synthetic_size;
if synthetic_size == 0 {
if let Some((_, value)) = cached.get(factory.key()) {
// use the latest value from previous session
synthetic_size = *value;
}
}
if synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
Some(factory.at(now, synthetic_size))
} else {
None
}
};
metrics.extend(
@@ -585,8 +821,6 @@ impl TimelineSnapshot {
t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Option<Self>> {
use anyhow::Context;
if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values
// though at the caller.
@@ -660,25 +894,30 @@ impl TimelineSnapshot {
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
// written_size_bytes_delta
metrics.extend(
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let up_to = written_size_now
.0
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
Some(key_value)
} else {
None
},
);
let up_to = now;
// written_size
metrics.push((key, written_size_now));
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta);
// written_size_delta
metrics.push(key_value);
// written_size
metrics.push((key, written_size_now));
} else {
// the cached value was ahead of us, report zero until we've caught up
metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0));
// the cached value was ahead of us, report the same until we've caught up
metrics.push((key, (written_size_now.0, prev.1)));
}
if let Some(size) = self.current_exact_logical_size {
metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size));
{
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
let current_or_previous = self
.current_exact_logical_size
.or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
if let Some(size) = current_or_previous {
metrics.push(factory.at(now, size));
}
}
}
}
@@ -738,9 +977,7 @@ mod tests {
lsn::Lsn,
};
use crate::consumption_metrics::MetricsKey;
use super::TimelineSnapshot;
use super::*;
use chrono::{DateTime, Utc};
#[test]
@@ -927,6 +1164,164 @@ mod tests {
}
}
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
// should never go backwards
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
]
);
}
#[test]
fn post_restart_current_exact_logical_size_uses_cached() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, at_restart] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
};
let cache = HashMap::from([
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100)
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
metrics.retain(|(key, _)| key.metric == Name::LogicalSize);
assert_eq!(
metrics,
&[MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 100)]
);
}
#[test]
fn post_restart_synthetic_size_uses_cached_if_available() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
};
let now = SystemTime::now();
let before_restart = DateTime::<Utc>::from(now - std::time::Duration::from_secs(5 * 60));
let now = DateTime::<Utc>::from(now);
let cached =
HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]);
let mut metrics = Vec::new();
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
assert_eq!(
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
MetricsKey::synthetic_size(tenant_id).at(now, 1000),
]
);
}
#[test]
fn post_restart_synthetic_size_is_not_sent_when_not_cached() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
};
let now = SystemTime::now();
let now = DateTime::<Utc>::from(now);
let cached = HashMap::new();
let mut metrics = Vec::new();
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
assert_eq!(
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
// no synthetic size here
]
);
}
fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
times[0] = std::time::SystemTime::now();

View File

@@ -3,9 +3,11 @@
# Use mock HTTP server to receive metrics and verify that they look sane.
#
import json
import time
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Iterator, Set
from typing import Any, Dict, Iterator, Set
import pytest
from fixtures.log_helper import log
@@ -18,6 +20,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -35,8 +38,6 @@ def test_metric_collection(
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_kinds_checked: Set[str] = set([])
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
@@ -57,6 +58,7 @@ def test_metric_collection(
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
@@ -75,8 +77,8 @@ def test_metric_collection(
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_metric_collection")
endpoint = env.endpoints.create_start("test_metric_collection")
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
@@ -118,60 +120,227 @@ def test_metric_collection(
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
# later tests will be added to ensure that the timeseries are sane.
timeout = 5
# these strings in the upload queue allow synchronizing with the uploads
# and the main test execution
uploads.put("ready")
# note that this verifier graph should live across restarts as long as the
# cache file lives
v = MetricsVerifier()
while True:
# discard earlier than "ready"
log.info("waiting for upload")
events = uploads.get(timeout=timeout)
import json
if events == "ready":
events = uploads.get(timeout=timeout)
httpserver.check()
httpserver.stop()
# if anything comes after this, we'll just ignore it
stringified = json.dumps(events, indent=2)
log.info(f"inspecting: {stringified}")
v.ingest(events)
break
else:
stringified = json.dumps(events, indent=2)
log.info(f"discarding: {stringified}")
v.ingest(events)
# verify that metrics look minimally sane
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
# this can also be zero, depending on when we get the value
"written_data_bytes_delta": lambda value: value >= 0,
}
if "synthetic_storage_size" not in v.accepted_event_names():
log.info("waiting for synthetic storage size to be calculated and uploaded...")
metric_kinds_checked = set()
metric_kinds_seen = set()
rounds = 0
while "synthetic_storage_size" not in v.accepted_event_names():
events = uploads.get(timeout=timeout)
v.ingest(events)
rounds += 1
assert rounds < 10, "did not get synthetic_storage_size in 10 uploads"
# once we have it in verifiers, it will assert that future batches will contain it
for event in events:
assert event["tenant_id"] == str(tenant_id)
metric_name = event["metric"]
metric_kinds_seen.add(metric_name)
env.pageserver.stop()
time.sleep(1)
uploads.put("ready")
env.pageserver.start()
check = checks.get(metric_name)
# calm down mypy
if check is not None:
value = event["value"]
log.info(f"checking {metric_name} value {value}")
assert check(value), f"{metric_name} isn't valid"
metric_kinds_checked.add(metric_name)
while True:
events = uploads.get(timeout=timeout)
expected_checks = set(checks.keys())
assert (
metric_kinds_checked == checks.keys()
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
assert metric_kinds_seen == metric_kinds_checked
if events == "ready":
events = uploads.get(timeout=timeout * 3)
v.ingest(events)
events = uploads.get(timeout=timeout)
v.ingest(events)
break
else:
v.ingest(events)
httpserver.check()
class MetricsVerifier:
"""
A graph of per tenant per timeline verifiers, allowing one for each
metric
"""
def __init__(self):
self.tenants: Dict[TenantId, TenantMetricsVerifier] = {}
pass
def ingest(self, events):
stringified = json.dumps(events, indent=2)
log.info(f"ingesting: {stringified}")
for event in events:
id = TenantId(event["tenant_id"])
if id not in self.tenants:
self.tenants[id] = TenantMetricsVerifier(id)
self.tenants[id].ingest(event)
for t in self.tenants.values():
t.post_batch()
def accepted_event_names(self) -> Set[str]:
names: Set[str] = set()
for t in self.tenants.values():
names = names.union(t.accepted_event_names())
return names
class TenantMetricsVerifier:
def __init__(self, id: TenantId):
self.id = id
self.timelines: Dict[TimelineId, TimelineMetricsVerifier] = {}
self.state: Dict[str, Any] = {}
def ingest(self, event):
assert TenantId(event["tenant_id"]) == self.id
if "timeline_id" in event:
id = TimelineId(event["timeline_id"])
if id not in self.timelines:
self.timelines[id] = TimelineMetricsVerifier(self.id, id)
self.timelines[id].ingest(event)
else:
name = event["metric"]
if name not in self.state:
self.state[name] = PER_METRIC_VERIFIERS[name]()
self.state[name].ingest(event, self)
def post_batch(self):
for v in self.state.values():
v.post_batch(self)
for tl in self.timelines.values():
tl.post_batch(self)
def accepted_event_names(self) -> Set[str]:
names = set(self.state.keys())
for t in self.timelines.values():
names = names.union(t.accepted_event_names())
return names
class TimelineMetricsVerifier:
def __init__(self, tenant_id: TenantId, timeline_id: TimelineId):
self.id = timeline_id
self.state: Dict[str, Any] = {}
def ingest(self, event):
name = event["metric"]
if name not in self.state:
self.state[name] = PER_METRIC_VERIFIERS[name]()
self.state[name].ingest(event, self)
def post_batch(self, parent):
for v in self.state.values():
v.post_batch(self)
def accepted_event_names(self) -> Set[str]:
return set(self.state.keys())
class CannotVerifyAnything:
"""We can only assert types, but rust already has types, so no need."""
def __init__(self):
pass
def ingest(self, event, parent):
pass
def post_batch(self, parent):
pass
class WrittenDataVerifier:
def __init__(self):
self.values = []
pass
def ingest(self, event, parent):
self.values.append(event["value"])
def post_batch(self, parent):
pass
class WrittenDataDeltaVerifier:
def __init__(self):
self.value = None
self.sum = 0
self.timerange = None
pass
def ingest(self, event, parent):
assert event["type"] == "incremental"
self.value = event["value"]
self.sum += event["value"]
start = event["start_time"]
stop = event["stop_time"]
timerange = (start, stop)
if self.timerange is not None:
# this holds across restarts
assert self.timerange[1] == timerange[0], "time ranges should be continious"
self.timerange = timerange
def post_batch(self, parent):
absolute = parent.state["written_size"]
if len(absolute.values) == 1:
# in tests this comes up as initdb execution, so we can have 0 or
# about 30MB on the first event. it is not consistent.
assert self.value is not None
else:
assert self.value == absolute.values[-1] - absolute.values[-2]
# sounds like this should hold, but it will not for branches -- probably related to timing
# assert self.sum == absolute.latest
class SyntheticSizeVerifier:
def __init__(self):
self.prev = None
self.value = None
pass
def ingest(self, event, parent):
assert isinstance(parent, TenantMetricsVerifier)
assert event["type"] == "absolute"
value = event["value"]
self.value = value
def post_batch(self, parent):
if self.prev is not None:
# this is assuming no one goes and deletes the cache file
assert (
self.value is not None
), "after calculating first synthetic size, cached or more recent should be sent"
self.prev = self.value
self.value = None
PER_METRIC_VERIFIERS = {
"remote_storage_size": CannotVerifyAnything,
"resident_size": CannotVerifyAnything,
"written_size": WrittenDataVerifier,
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"timeline_logical_size": CannotVerifyAnything,
"synthetic_storage_size": SyntheticSizeVerifier,
}
def proxy_metrics_handler(request: Request) -> Response: