mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
We want to have timeline_written_size_delta which is defined as difference to the previously sent `timeline_written_size` from the current `timeline_written_size`. Solution is to send it. On the first round `disk_consistent_lsn` is used which is captured during `load` time. After that an incremental "event" is sent on every collection. Incremental "events" are not part of deduplication. I've added some infrastructure to allow somewhat typesafe `EventType::Absolute` and `EventType::Incremental` factories per metrics, now that we have our first `EventType::Incremental` usage.
77 lines
1.8 KiB
Rust
77 lines
1.8 KiB
Rust
//!
|
|
//! Shared code for consumption metics collection
|
|
//!
|
|
use chrono::{DateTime, Utc};
|
|
use rand::Rng;
|
|
use serde::Serialize;
|
|
|
|
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
|
|
#[serde(tag = "type")]
|
|
pub enum EventType {
|
|
#[serde(rename = "absolute")]
|
|
Absolute { time: DateTime<Utc> },
|
|
#[serde(rename = "incremental")]
|
|
Incremental {
|
|
start_time: DateTime<Utc>,
|
|
stop_time: DateTime<Utc>,
|
|
},
|
|
}
|
|
|
|
impl EventType {
|
|
pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
|
|
use EventType::*;
|
|
match self {
|
|
Absolute { time } => Some(time),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
|
|
// these can most likely be thought of as Range or RangeFull
|
|
use EventType::*;
|
|
match self {
|
|
Incremental {
|
|
start_time,
|
|
stop_time,
|
|
} => Some(start_time..stop_time),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
pub fn is_incremental(&self) -> bool {
|
|
matches!(self, EventType::Incremental { .. })
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
|
pub struct Event<Extra> {
|
|
#[serde(flatten)]
|
|
#[serde(rename = "type")]
|
|
pub kind: EventType,
|
|
|
|
pub metric: &'static str,
|
|
pub idempotency_key: String,
|
|
pub value: u64,
|
|
|
|
#[serde(flatten)]
|
|
pub extra: Extra,
|
|
}
|
|
|
|
pub fn idempotency_key(node_id: String) -> String {
|
|
format!(
|
|
"{}-{}-{:04}",
|
|
Utc::now(),
|
|
node_id,
|
|
rand::thread_rng().gen_range(0..=9999)
|
|
)
|
|
}
|
|
|
|
pub const CHUNK_SIZE: usize = 1000;
|
|
|
|
// Just a wrapper around a slice of events
|
|
// to serialize it as `{"events" : [ ] }
|
|
#[derive(serde::Serialize)]
|
|
pub struct EventChunk<'a, T> {
|
|
pub events: &'a [T],
|
|
}
|