mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
refactor(consumption_metrics): prereq refactorings, tests (#5315)
Split off from #5297. There should be no functional changes here: - refactor tenant metric "production" like previously timeline, allows unit testing, though not interesting enough yet to test - introduce type aliases for tuples - extra refactoring for `collect`, was initially thinking it was useful but will do a inline later - shorter binding names - support for future allocation reuse quests with IdempotencyKey - move code out of tokio::select to make it rustfmt-able - generification, allow later replacement of `&'static str` with enum - add tests that assert sent event contents exactly
This commit is contained in:
@@ -27,7 +27,8 @@ impl EventType {
|
||||
}
|
||||
|
||||
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
|
||||
// these can most likely be thought of as Range or RangeFull
|
||||
// these can most likely be thought of as Range or RangeFull, at least pageserver creates
|
||||
// incremental ranges where the stop and next start are equal.
|
||||
use EventType::*;
|
||||
match self {
|
||||
Incremental {
|
||||
@@ -41,15 +42,25 @@ impl EventType {
|
||||
pub fn is_incremental(&self) -> bool {
|
||||
matches!(self, EventType::Incremental { .. })
|
||||
}
|
||||
|
||||
/// Returns the absolute time, or for incremental ranges, the stop time.
|
||||
pub fn recorded_at(&self) -> &DateTime<Utc> {
|
||||
use EventType::*;
|
||||
|
||||
match self {
|
||||
Absolute { time } => time,
|
||||
Incremental { stop_time, .. } => stop_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub struct Event<Extra> {
|
||||
pub struct Event<Extra, Metric: Serialize> {
|
||||
#[serde(flatten)]
|
||||
#[serde(rename = "type")]
|
||||
pub kind: EventType,
|
||||
|
||||
pub metric: &'static str,
|
||||
pub metric: Metric,
|
||||
pub idempotency_key: String,
|
||||
pub value: u64,
|
||||
|
||||
@@ -58,12 +69,38 @@ pub struct Event<Extra> {
|
||||
}
|
||||
|
||||
pub fn idempotency_key(node_id: &str) -> String {
|
||||
format!(
|
||||
"{}-{}-{:04}",
|
||||
Utc::now(),
|
||||
node_id,
|
||||
rand::thread_rng().gen_range(0..=9999)
|
||||
)
|
||||
IdempotencyKey::generate(node_id).to_string()
|
||||
}
|
||||
|
||||
/// Downstream users will use these to detect upload retries.
|
||||
pub struct IdempotencyKey<'a> {
|
||||
now: chrono::DateTime<Utc>,
|
||||
node_id: &'a str,
|
||||
nonce: u16,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for IdempotencyKey<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> IdempotencyKey<'a> {
|
||||
pub fn generate(node_id: &'a str) -> Self {
|
||||
IdempotencyKey {
|
||||
now: Utc::now(),
|
||||
node_id,
|
||||
nonce: rand::thread_rng().gen_range(0..=9999),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn for_tests(now: DateTime<Utc>, node_id: &'a str, nonce: u16) -> Self {
|
||||
IdempotencyKey {
|
||||
now,
|
||||
node_id,
|
||||
nonce,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub const CHUNK_SIZE: usize = 1000;
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::tenant::{mgr, LogicalSizeCalculationCause};
|
||||
use anyhow;
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
|
||||
use futures::stream::StreamExt;
|
||||
use pageserver_api::models::TenantState;
|
||||
use reqwest::Url;
|
||||
use serde::Serialize;
|
||||
@@ -32,7 +33,37 @@ struct Ids {
|
||||
timeline_id: Option<TimelineId>,
|
||||
}
|
||||
|
||||
/// Key that uniquely identifies the object, this metric describes.
|
||||
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
|
||||
/// instead of static str.
|
||||
// Do not rename any of these without first consulting with data team and partner
|
||||
// management.
|
||||
// FIXME: write those tests before refactoring to this!
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
|
||||
enum Name {
|
||||
/// Timeline last_record_lsn, absolute
|
||||
#[serde(rename = "written_size")]
|
||||
WrittenSize,
|
||||
/// Timeline last_record_lsn, incremental
|
||||
#[serde(rename = "written_data_bytes_delta")]
|
||||
WrittenSizeDelta,
|
||||
/// Timeline logical size
|
||||
#[serde(rename = "timeline_logical_size")]
|
||||
LogicalSize,
|
||||
/// Tenant remote size
|
||||
#[serde(rename = "remote_storage_size")]
|
||||
RemoteSize,
|
||||
/// Tenant resident size
|
||||
#[serde(rename = "resident_size")]
|
||||
ResidentSize,
|
||||
/// Tenant synthetic size
|
||||
#[serde(rename = "synthetic_storage_size")]
|
||||
SyntheticSize,
|
||||
}
|
||||
|
||||
/// Key that uniquely identifies the object this metric describes.
|
||||
///
|
||||
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
|
||||
/// elsewhere.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct MetricsKey {
|
||||
tenant_id: TenantId,
|
||||
@@ -53,7 +84,7 @@ impl MetricsKey {
|
||||
struct AbsoluteValueFactory(MetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
fn at(self, time: DateTime<Utc>, val: u64) -> (MetricsKey, (EventType, u64)) {
|
||||
fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
let key = self.0;
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
@@ -69,7 +100,7 @@ impl IncrementalValueFactory {
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> (MetricsKey, (EventType, u64)) {
|
||||
) -> RawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
(
|
||||
@@ -172,6 +203,20 @@ impl MetricsKey {
|
||||
}
|
||||
}
|
||||
|
||||
/// Basically a key-value pair, but usually in a Vec except for [`Cache`].
|
||||
///
|
||||
/// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
|
||||
/// Difference is basically the missing idempotency key, which lives only for the duration of
|
||||
/// upload attempts.
|
||||
type RawMetric = (MetricsKey, (EventType, u64));
|
||||
|
||||
/// Caches the [`RawMetric`]s
|
||||
///
|
||||
/// In practice, during startup, last sent values are stored here to be used in calculating new
|
||||
/// ones. After successful uploading, the cached values are updated to cache. This used to be used
|
||||
/// for deduplication, but that is no longer needed.
|
||||
type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
|
||||
/// Main thread that serves metrics collection
|
||||
pub async fn collect_metrics(
|
||||
metric_collection_endpoint: &Url,
|
||||
@@ -249,13 +294,12 @@ pub async fn collect_metrics(
|
||||
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
|
||||
async fn collect_metrics_iteration(
|
||||
client: &reqwest::Client,
|
||||
cached_metrics: &mut HashMap<MetricsKey, (EventType, u64)>,
|
||||
cached_metrics: &mut Cache,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
node_id: NodeId,
|
||||
ctx: &RequestContext,
|
||||
send_cached: bool,
|
||||
) {
|
||||
let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
|
||||
trace!(
|
||||
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
|
||||
metric_collection_endpoint
|
||||
@@ -270,69 +314,18 @@ async fn collect_metrics_iteration(
|
||||
}
|
||||
};
|
||||
|
||||
// iterate through list of Active tenants and collect metrics
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
mgr::get_tenant(id, true)
|
||||
.await
|
||||
.ok()
|
||||
.map(|tenant| (id, tenant))
|
||||
}
|
||||
});
|
||||
|
||||
let tenant = match mgr::get_tenant(tenant_id, true).await {
|
||||
Ok(tenant) => tenant,
|
||||
Err(err) => {
|
||||
// It is possible that tenant was deleted between
|
||||
// `list_tenants` and `get_tenant`, so just warn about it.
|
||||
warn!("failed to get tenant {tenant_id:?}: {err:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
match TimelineSnapshot::collect(&timeline, ctx) {
|
||||
Ok(Some(snap)) => {
|
||||
snap.to_metrics(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
Utc::now(),
|
||||
&mut current_metrics,
|
||||
cached_metrics,
|
||||
);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tenant_resident_size += timeline.resident_physical_size();
|
||||
}
|
||||
|
||||
current_metrics
|
||||
.push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size()));
|
||||
|
||||
current_metrics
|
||||
.push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size));
|
||||
|
||||
// Note that this metric is calculated in a separate bgworker
|
||||
// Here we only use cached value, which may lag behind the real latest one
|
||||
let synthetic_size = tenant.cached_synthetic_size();
|
||||
|
||||
if synthetic_size != 0 {
|
||||
// only send non-zeroes because otherwise these show up as errors in logs
|
||||
current_metrics
|
||||
.push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size));
|
||||
}
|
||||
}
|
||||
let mut current_metrics = collect(tenants, cached_metrics, ctx).await;
|
||||
|
||||
// Filter metrics, unless we want to send all metrics, including cached ones.
|
||||
// See: https://github.com/neondatabase/neon/issues/3485
|
||||
@@ -360,7 +353,7 @@ async fn collect_metrics_iteration(
|
||||
// Split into chunks of 1000 metrics to avoid exceeding the max request size
|
||||
let chunks = current_metrics.chunks(CHUNK_SIZE);
|
||||
|
||||
let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
|
||||
let mut chunk_to_send: Vec<Event<Ids, &'static str>> = Vec::with_capacity(CHUNK_SIZE);
|
||||
|
||||
let node_id = node_id.to_string();
|
||||
|
||||
@@ -422,6 +415,102 @@ async fn collect_metrics_iteration(
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect<S>(
|
||||
tenants: S,
|
||||
cache: &HashMap<MetricsKey, (EventType, u64)>,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<(MetricsKey, (EventType, u64))>
|
||||
where
|
||||
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
|
||||
{
|
||||
let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
|
||||
|
||||
let mut tenants = std::pin::pin!(tenants);
|
||||
|
||||
while let Some((tenant_id, tenant)) = tenants.next().await {
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
match TimelineSnapshot::collect(&timeline, ctx) {
|
||||
Ok(Some(snap)) => {
|
||||
snap.to_metrics(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
Utc::now(),
|
||||
&mut current_metrics,
|
||||
cache,
|
||||
);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
tenant_resident_size += timeline.resident_physical_size();
|
||||
}
|
||||
|
||||
TenantSnapshot::collect(&tenant, tenant_resident_size).to_metrics(
|
||||
tenant_id,
|
||||
Utc::now(),
|
||||
&mut current_metrics,
|
||||
);
|
||||
}
|
||||
|
||||
current_metrics
|
||||
}
|
||||
|
||||
/// Testing helping in-between abstraction allowing testing metrics without actual Tenants.
|
||||
struct TenantSnapshot {
|
||||
resident_size: u64,
|
||||
remote_size: u64,
|
||||
synthetic_size: u64,
|
||||
}
|
||||
|
||||
impl TenantSnapshot {
|
||||
/// Collect tenant status to have metrics created out of it.
|
||||
///
|
||||
/// `resident_size` is calculated of the timelines we had access to for other metrics, so we
|
||||
/// cannot just list timelines here.
|
||||
fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
|
||||
TenantSnapshot {
|
||||
resident_size,
|
||||
remote_size: t.remote_size(),
|
||||
// Note that this metric is calculated in a separate bgworker
|
||||
// Here we only use cached value, which may lag behind the real latest one
|
||||
synthetic_size: t.cached_synthetic_size(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_metrics(&self, tenant_id: TenantId, now: DateTime<Utc>, metrics: &mut Vec<RawMetric>) {
|
||||
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
|
||||
|
||||
let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
|
||||
|
||||
let synthetic_size = if self.synthetic_size != 0 {
|
||||
// only send non-zeroes because otherwise these show up as errors in logs
|
||||
Some(MetricsKey::synthetic_size(tenant_id).at(now, self.synthetic_size))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
metrics.extend(
|
||||
[Some(remote_size), Some(resident_size), synthetic_size]
|
||||
.into_iter()
|
||||
.flatten(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal type to make timeline metric production testable.
|
||||
///
|
||||
/// As this value type contains all of the information needed from a timeline to produce the
|
||||
@@ -478,22 +567,13 @@ impl TimelineSnapshot {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
metrics: &mut Vec<(MetricsKey, (EventType, u64))>,
|
||||
cache: &HashMap<MetricsKey, (EventType, u64)>,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
cache: &Cache,
|
||||
) {
|
||||
let timeline_written_size = u64::from(self.last_record_lsn);
|
||||
|
||||
let (key, written_size_now) =
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
|
||||
|
||||
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
|
||||
// features might change this.
|
||||
|
||||
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
|
||||
|
||||
// use this when available, because in a stream of incremental values, it will be
|
||||
// accurate where as when last_record_lsn stops moving, we will only cache the last
|
||||
// one of those.
|
||||
let last_stop_time = cache
|
||||
.get(written_size_delta_key.key())
|
||||
.map(|(until, _val)| {
|
||||
@@ -503,6 +583,9 @@ impl TimelineSnapshot {
|
||||
.end
|
||||
});
|
||||
|
||||
let (key, written_size_now) =
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
|
||||
|
||||
// by default, use the last sent written_size as the basis for
|
||||
// calculating the delta. if we don't yet have one, use the load time value.
|
||||
let prev = cache
|
||||
@@ -547,53 +630,47 @@ impl TimelineSnapshot {
|
||||
}
|
||||
|
||||
/// Caclculate synthetic size for each active tenant
|
||||
pub async fn calculate_synthetic_size_worker(
|
||||
async fn calculate_synthetic_size_worker(
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("starting calculate_synthetic_size_worker");
|
||||
|
||||
// reminder: this ticker is ready right away
|
||||
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
|
||||
let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
return Ok(());
|
||||
},
|
||||
tick_at = ticker.tick() => {
|
||||
let tick_at = tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => return Ok(()),
|
||||
tick_at = ticker.tick() => tick_at,
|
||||
};
|
||||
|
||||
let tenants = match mgr::list_tenants().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(e) => {
|
||||
warn!("cannot get tenant list: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// iterate through list of Active tenants and collect metrics
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
let tenants = match mgr::list_tenants().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(e) => {
|
||||
warn!("cannot get tenant list: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
|
||||
{
|
||||
if let Err(e) = tenant.calculate_synthetic_size(
|
||||
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
|
||||
ctx).await {
|
||||
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
|
||||
}
|
||||
}
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
|
||||
if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
|
||||
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
|
||||
}
|
||||
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
synthetic_size_calculation_interval,
|
||||
"consumption_metrics_synthetic_size_worker",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
synthetic_size_calculation_interval,
|
||||
"consumption_metrics_synthetic_size_worker",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -732,6 +809,70 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metric_image_stability() {
|
||||
// it is important that these strings stay as they are
|
||||
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap();
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap();
|
||||
|
||||
let [now, before] = [DateTime::<Utc>::from(now), DateTime::from(before)];
|
||||
|
||||
let examples = [
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_previous_up_to(before, now, 0),
|
||||
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::resident_size(tenant_id).at(now, 0),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
(
|
||||
line!(),
|
||||
MetricsKey::synthetic_size(tenant_id).at(now, 1),
|
||||
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
|
||||
),
|
||||
];
|
||||
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0);
|
||||
|
||||
for (line, (key, (kind, value)), expected) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind,
|
||||
metric: key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value,
|
||||
extra: super::Ids {
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(expected, actual, "example from line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
|
||||
times[0] = std::time::SystemTime::now();
|
||||
|
||||
@@ -121,7 +121,7 @@ async fn collect_metrics_iteration(
|
||||
|
||||
let current_metrics = gather_proxy_io_bytes_per_client();
|
||||
|
||||
let metrics_to_send: Vec<Event<Ids>> = current_metrics
|
||||
let metrics_to_send: Vec<Event<Ids, &'static str>> = current_metrics
|
||||
.iter()
|
||||
.filter_map(|(curr_key, (curr_val, curr_time))| {
|
||||
let mut start_time = *curr_time;
|
||||
|
||||
Reference in New Issue
Block a user