mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
refactor(pageserver): use JSON field encoding for consumption metrics cache (#9470)
In https://github.com/neondatabase/neon/issues/9032, I would like to eventually add a `generation` field to the consumption metrics cache. The current encoding is not backward compatible and it is hard to add another field into the cache. Therefore, this patch refactors the format to store "field -> value", and it's easier to maintain backward/forward compatibility with this new format. ## Summary of changes * Add `NewRawMetric` as the new format. * Add upgrade path. When opening the disk cache, the codepath first inspects the `version` field, and decide how to decode. * Refactor metrics generation code and tests. * Add tests on upgrade / compatibility with the old format. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -14,6 +14,7 @@ use itertools::Itertools as _;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
@@ -35,12 +36,62 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
/// upload attempts.
|
||||
type RawMetric = (MetricsKey, (EventType, u64));
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct NewMetricsRoot {
|
||||
version: usize,
|
||||
metrics: Vec<NewRawMetric>,
|
||||
}
|
||||
|
||||
impl NewMetricsRoot {
|
||||
pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool {
|
||||
if let Some(ver) = json_value.get("version") {
|
||||
if let Some(2) = ver.as_u64() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize)]
|
||||
struct NewMetricsRefRoot<'a> {
|
||||
version: usize,
|
||||
metrics: &'a [NewRawMetric],
|
||||
}
|
||||
|
||||
impl<'a> NewMetricsRefRoot<'a> {
|
||||
fn new(metrics: &'a [NewRawMetric]) -> Self {
|
||||
Self {
|
||||
version: 2,
|
||||
metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The new serializable metrics format
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
struct NewRawMetric {
|
||||
key: MetricsKey,
|
||||
kind: EventType,
|
||||
value: u64,
|
||||
// TODO: add generation field and check against generations
|
||||
}
|
||||
|
||||
impl NewRawMetric {
|
||||
#[cfg(test)]
|
||||
fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) {
|
||||
(self.key, self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Caches the [`RawMetric`]s
|
||||
///
|
||||
/// In practice, during startup, last sent values are stored here to be used in calculating new
|
||||
/// ones. After successful uploading, the cached values are updated to cache. This used to be used
|
||||
/// for deduplication, but that is no longer needed.
|
||||
type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
type Cache = HashMap<MetricsKey, NewRawMetric>;
|
||||
|
||||
pub async fn run(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -231,11 +282,14 @@ async fn restore_and_reschedule(
|
||||
// collect_all_metrics
|
||||
let earlier_metric_at = found_some
|
||||
.iter()
|
||||
.map(|(_, (et, _))| et.recorded_at())
|
||||
.map(|item| item.kind.recorded_at())
|
||||
.copied()
|
||||
.next();
|
||||
|
||||
let cached = found_some.into_iter().collect::<Cache>();
|
||||
let cached = found_some
|
||||
.into_iter()
|
||||
.map(|item| (item.key, item))
|
||||
.collect::<Cache>();
|
||||
|
||||
(cached, earlier_metric_at)
|
||||
}
|
||||
|
||||
@@ -2,11 +2,33 @@ use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::RawMetric;
|
||||
use crate::consumption_metrics::NewMetricsRefRoot;
|
||||
|
||||
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
|
||||
|
||||
pub(super) fn read_metrics_from_serde_value(
|
||||
json_value: serde_json::Value,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
if NewMetricsRoot::is_v2_metrics(&json_value) {
|
||||
let root = serde_json::from_value::<NewMetricsRoot>(json_value)?;
|
||||
Ok(root.metrics)
|
||||
} else {
|
||||
let all_metrics = serde_json::from_value::<Vec<RawMetric>>(json_value)?;
|
||||
let all_metrics = all_metrics
|
||||
.into_iter()
|
||||
.map(|(key, (event_type, value))| NewRawMetric {
|
||||
key,
|
||||
kind: event_type,
|
||||
value,
|
||||
})
|
||||
.collect();
|
||||
Ok(all_metrics)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn read_metrics_from_disk(
|
||||
path: Arc<Utf8PathBuf>,
|
||||
) -> anyhow::Result<Vec<RawMetric>> {
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
// do not add context to each error, callsite will log with full path
|
||||
let span = tracing::Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
@@ -20,7 +42,8 @@ pub(super) async fn read_metrics_from_disk(
|
||||
|
||||
let mut file = std::fs::File::open(&*path)?;
|
||||
let reader = std::io::BufReader::new(&mut file);
|
||||
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
|
||||
let json_value = serde_json::from_reader::<_, serde_json::Value>(reader)?;
|
||||
read_metrics_from_serde_value(json_value)
|
||||
})
|
||||
.await
|
||||
.context("read metrics join error")
|
||||
@@ -63,7 +86,7 @@ fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
|
||||
}
|
||||
|
||||
pub(super) async fn flush_metrics_to_disk(
|
||||
current_metrics: &Arc<Vec<RawMetric>>,
|
||||
current_metrics: &Arc<Vec<NewRawMetric>>,
|
||||
path: &Arc<Utf8PathBuf>,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::io::Write;
|
||||
@@ -93,8 +116,11 @@ pub(super) async fn flush_metrics_to_disk(
|
||||
// write out all of the raw metrics, to be read out later on restart as cached values
|
||||
{
|
||||
let mut writer = std::io::BufWriter::new(&mut tempfile);
|
||||
serde_json::to_writer(&mut writer, &*current_metrics)
|
||||
.context("serialize metrics")?;
|
||||
serde_json::to_writer(
|
||||
&mut writer,
|
||||
&NewMetricsRefRoot::new(current_metrics.as_ref()),
|
||||
)
|
||||
.context("serialize metrics")?;
|
||||
writer
|
||||
.into_inner()
|
||||
.map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
|
||||
|
||||
@@ -9,7 +9,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{Cache, RawMetric};
|
||||
use super::{Cache, NewRawMetric};
|
||||
|
||||
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
|
||||
/// instead of static str.
|
||||
@@ -64,11 +64,21 @@ impl MetricsKey {
|
||||
struct AbsoluteValueFactory(MetricsKey);
|
||||
|
||||
impl AbsoluteValueFactory {
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
|
||||
#[cfg(test)]
|
||||
const fn at_old_format(self, time: DateTime<Utc>, val: u64) -> super::RawMetric {
|
||||
let key = self.0;
|
||||
(key, (EventType::Absolute { time }, val))
|
||||
}
|
||||
|
||||
const fn at(self, time: DateTime<Utc>, val: u64) -> NewRawMetric {
|
||||
let key = self.0;
|
||||
NewRawMetric {
|
||||
key,
|
||||
kind: EventType::Absolute { time },
|
||||
value: val,
|
||||
}
|
||||
}
|
||||
|
||||
fn key(&self) -> &MetricsKey {
|
||||
&self.0
|
||||
}
|
||||
@@ -84,7 +94,28 @@ impl IncrementalValueFactory {
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> RawMetric {
|
||||
) -> NewRawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
start_time: prev_end,
|
||||
stop_time: up_to,
|
||||
};
|
||||
NewRawMetric {
|
||||
key,
|
||||
kind: when,
|
||||
value: val,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::wrong_self_convention)]
|
||||
#[cfg(test)]
|
||||
const fn from_until_old_format(
|
||||
self,
|
||||
prev_end: DateTime<Utc>,
|
||||
up_to: DateTime<Utc>,
|
||||
val: u64,
|
||||
) -> super::RawMetric {
|
||||
let key = self.0;
|
||||
// cannot assert prev_end < up_to because these are realtime clock based
|
||||
let when = EventType::Incremental {
|
||||
@@ -185,7 +216,7 @@ pub(super) async fn collect_all_metrics(
|
||||
tenant_manager: &Arc<TenantManager>,
|
||||
cached_metrics: &Cache,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<RawMetric> {
|
||||
) -> Vec<NewRawMetric> {
|
||||
use pageserver_api::models::TenantState;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
@@ -220,11 +251,11 @@ pub(super) async fn collect_all_metrics(
|
||||
res
|
||||
}
|
||||
|
||||
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
|
||||
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
|
||||
where
|
||||
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
|
||||
{
|
||||
let mut current_metrics: Vec<RawMetric> = Vec::new();
|
||||
let mut current_metrics: Vec<NewRawMetric> = Vec::new();
|
||||
|
||||
let mut tenants = std::pin::pin!(tenants);
|
||||
|
||||
@@ -291,7 +322,7 @@ impl TenantSnapshot {
|
||||
tenant_id: TenantId,
|
||||
now: DateTime<Utc>,
|
||||
cached: &Cache,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
metrics: &mut Vec<NewRawMetric>,
|
||||
) {
|
||||
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
|
||||
|
||||
@@ -302,9 +333,9 @@ impl TenantSnapshot {
|
||||
let mut synthetic_size = self.synthetic_size;
|
||||
|
||||
if synthetic_size == 0 {
|
||||
if let Some((_, value)) = cached.get(factory.key()) {
|
||||
// use the latest value from previous session
|
||||
synthetic_size = *value;
|
||||
if let Some(item) = cached.get(factory.key()) {
|
||||
// use the latest value from previous session, TODO: check generation number
|
||||
synthetic_size = item.value;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,37 +412,36 @@ impl TimelineSnapshot {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
metrics: &mut Vec<RawMetric>,
|
||||
metrics: &mut Vec<NewRawMetric>,
|
||||
cache: &Cache,
|
||||
) {
|
||||
let timeline_written_size = u64::from(self.last_record_lsn);
|
||||
|
||||
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
|
||||
|
||||
let last_stop_time = cache
|
||||
.get(written_size_delta_key.key())
|
||||
.map(|(until, _val)| {
|
||||
until
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
let last_stop_time = cache.get(written_size_delta_key.key()).map(|item| {
|
||||
item.kind
|
||||
.incremental_timerange()
|
||||
.expect("never create EventType::Absolute for written_size_delta")
|
||||
.end
|
||||
});
|
||||
|
||||
let (key, written_size_now) =
|
||||
let written_size_now =
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
|
||||
|
||||
// by default, use the last sent written_size as the basis for
|
||||
// calculating the delta. if we don't yet have one, use the load time value.
|
||||
let prev = cache
|
||||
.get(&key)
|
||||
.map(|(prev_at, prev)| {
|
||||
let prev: (DateTime<Utc>, u64) = cache
|
||||
.get(&written_size_now.key)
|
||||
.map(|item| {
|
||||
// use the prev time from our last incremental update, or default to latest
|
||||
// absolute update on the first round.
|
||||
let prev_at = prev_at
|
||||
let prev_at = item
|
||||
.kind
|
||||
.absolute_time()
|
||||
.expect("never create EventType::Incremental for written_size");
|
||||
let prev_at = last_stop_time.unwrap_or(prev_at);
|
||||
(*prev_at, *prev)
|
||||
(*prev_at, item.value)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
// if we don't have a previous point of comparison, compare to the load time
|
||||
@@ -422,24 +452,28 @@ impl TimelineSnapshot {
|
||||
|
||||
let up_to = now;
|
||||
|
||||
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
|
||||
if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
|
||||
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
|
||||
// written_size_delta
|
||||
metrics.push(key_value);
|
||||
// written_size
|
||||
metrics.push((key, written_size_now));
|
||||
metrics.push(written_size_now);
|
||||
} else {
|
||||
// the cached value was ahead of us, report zero until we've caught up
|
||||
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
|
||||
// the cached value was ahead of us, report the same until we've caught up
|
||||
metrics.push((key, (written_size_now.0, prev.1)));
|
||||
metrics.push(NewRawMetric {
|
||||
key: written_size_now.key,
|
||||
kind: written_size_now.kind,
|
||||
value: prev.1,
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
|
||||
let current_or_previous = self
|
||||
.current_exact_logical_size
|
||||
.or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
|
||||
.or_else(|| cache.get(factory.key()).map(|item| item.value));
|
||||
|
||||
if let Some(size) = current_or_previous {
|
||||
metrics.push(factory.at(now, size));
|
||||
@@ -452,4 +486,4 @@ impl TimelineSnapshot {
|
||||
mod tests;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::metric_examples;
|
||||
pub(crate) use tests::{metric_examples, metric_examples_old};
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use crate::consumption_metrics::RawMetric;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -50,9 +52,9 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
|
||||
]);
|
||||
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before, disk_consistent_lsn.0)
|
||||
.to_kv_pair()]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
@@ -89,9 +91,13 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
let mut metrics = Vec::new();
|
||||
let cache = HashMap::from([
|
||||
// at t=before was the last time the last_record_lsn changed
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before, disk_consistent_lsn.0)
|
||||
.to_kv_pair(),
|
||||
// end time of this event is used for the next ones
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(before, just_before, 0)
|
||||
.to_kv_pair(),
|
||||
]);
|
||||
|
||||
let snap = TimelineSnapshot {
|
||||
@@ -138,13 +144,17 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair(),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until(
|
||||
way_before,
|
||||
before_restart,
|
||||
// not taken into account, but the timestamps are important
|
||||
999_999_999,
|
||||
)
|
||||
.to_kv_pair(),
|
||||
]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
@@ -163,7 +173,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
);
|
||||
|
||||
// now if we cache these metrics, and re-run while "still in recovery"
|
||||
cache.extend(metrics.drain(..));
|
||||
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
|
||||
|
||||
// "still in recovery", because our snapshot did not change
|
||||
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
|
||||
@@ -194,14 +204,14 @@ fn post_restart_current_exact_logical_size_uses_cached() {
|
||||
current_exact_logical_size: None,
|
||||
};
|
||||
|
||||
let cache = HashMap::from([
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100)
|
||||
]);
|
||||
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
|
||||
.at(before_restart, 100)
|
||||
.to_kv_pair()]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
|
||||
metrics.retain(|(key, _)| key.metric == Name::LogicalSize);
|
||||
metrics.retain(|item| item.key.metric == Name::LogicalSize);
|
||||
|
||||
assert_eq!(
|
||||
metrics,
|
||||
@@ -224,7 +234,9 @@ fn post_restart_synthetic_size_uses_cached_if_available() {
|
||||
let before_restart = DateTime::<Utc>::from(now - std::time::Duration::from_secs(5 * 60));
|
||||
let now = DateTime::<Utc>::from(now);
|
||||
|
||||
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]);
|
||||
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id)
|
||||
.at(before_restart, 1000)
|
||||
.to_kv_pair()]);
|
||||
|
||||
let mut metrics = Vec::new();
|
||||
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
|
||||
@@ -278,12 +290,29 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
|
||||
times
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples(
|
||||
pub(crate) const fn metric_examples_old(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [RawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id)
|
||||
.from_until_old_format(before, now, 0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
|
||||
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::resident_size(tenant_id).at_old_format(now, 0),
|
||||
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
|
||||
]
|
||||
}
|
||||
|
||||
pub(crate) const fn metric_examples(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
now: DateTime<Utc>,
|
||||
before: DateTime<Utc>,
|
||||
) -> [NewRawMetric; 6] {
|
||||
[
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
|
||||
use super::{metrics::Name, Cache, MetricsKey, NewRawMetric, RawMetric};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// How the metrics from pageserver are identified.
|
||||
@@ -24,7 +24,7 @@ pub(super) async fn upload_metrics_http(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
cancel: &CancellationToken,
|
||||
metrics: &[RawMetric],
|
||||
metrics: &[NewRawMetric],
|
||||
cached_metrics: &mut Cache,
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -53,8 +53,8 @@ pub(super) async fn upload_metrics_http(
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
for (curr_key, curr_val) in chunk {
|
||||
cached_metrics.insert(*curr_key, *curr_val);
|
||||
for item in chunk {
|
||||
cached_metrics.insert(item.key, item.clone());
|
||||
}
|
||||
uploaded += chunk.len();
|
||||
}
|
||||
@@ -86,7 +86,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
client: &GenericRemoteStorage,
|
||||
cancel: &CancellationToken,
|
||||
node_id: &str,
|
||||
metrics: &[RawMetric],
|
||||
metrics: &[NewRawMetric],
|
||||
idempotency_keys: &[IdempotencyKey<'_>],
|
||||
) -> anyhow::Result<()> {
|
||||
if metrics.is_empty() {
|
||||
@@ -140,16 +140,16 @@ pub(super) async fn upload_metrics_bucket(
|
||||
/// across different metrics sinks), and must have the same length as input.
|
||||
fn serialize_in_chunks<'a>(
|
||||
chunk_size: usize,
|
||||
input: &'a [RawMetric],
|
||||
input: &'a [NewRawMetric],
|
||||
idempotency_keys: &'a [IdempotencyKey<'a>],
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
assert_eq!(input.len(), idempotency_keys.len());
|
||||
|
||||
struct Iter<'a> {
|
||||
inner: std::slice::Chunks<'a, RawMetric>,
|
||||
inner: std::slice::Chunks<'a, NewRawMetric>,
|
||||
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
|
||||
chunk_size: usize,
|
||||
|
||||
@@ -160,7 +160,7 @@ fn serialize_in_chunks<'a>(
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
|
||||
type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let chunk = self.inner.next()?;
|
||||
@@ -269,6 +269,58 @@ impl RawMetricExt for RawMetric {
|
||||
}
|
||||
}
|
||||
|
||||
impl RawMetricExt for NewRawMetric {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.key;
|
||||
|
||||
let kind = self.kind;
|
||||
let value = self.value;
|
||||
|
||||
Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: key.to_string(),
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
|
||||
use std::fmt::Write;
|
||||
|
||||
let MetricsKey {
|
||||
metric,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} = self.key;
|
||||
|
||||
let kind = self.kind;
|
||||
let value = self.value;
|
||||
|
||||
*event = Event {
|
||||
kind,
|
||||
metric,
|
||||
idempotency_key: {
|
||||
event.idempotency_key.clear();
|
||||
write!(event.idempotency_key, "{key}").unwrap();
|
||||
std::mem::take(&mut event.idempotency_key)
|
||||
},
|
||||
value,
|
||||
extra: Ids {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait KeyGen<'a> {
|
||||
fn generate(&self) -> IdempotencyKey<'a>;
|
||||
}
|
||||
@@ -381,6 +433,10 @@ async fn upload(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::consumption_metrics::{
|
||||
disk_cache::read_metrics_from_serde_value, NewMetricsRefRoot,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -473,23 +529,49 @@ mod tests {
|
||||
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
|
||||
let examples = examples.into_iter().zip(metric_samples());
|
||||
|
||||
for ((line, expected), (key, (kind, value))) in examples {
|
||||
for ((line, expected), item) in examples {
|
||||
let e = consumption_metrics::Event {
|
||||
kind,
|
||||
metric: key.metric,
|
||||
kind: item.kind,
|
||||
metric: item.key.metric,
|
||||
idempotency_key: idempotency_key.to_string(),
|
||||
value,
|
||||
value: item.value,
|
||||
extra: Ids {
|
||||
tenant_id: key.tenant_id,
|
||||
timeline_id: key.timeline_id,
|
||||
tenant_id: item.key.tenant_id,
|
||||
timeline_id: item.key.timeline_id,
|
||||
},
|
||||
};
|
||||
let actual = serde_json::to_string(&e).unwrap();
|
||||
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
|
||||
assert_eq!(
|
||||
expected, actual,
|
||||
"example for {:?} from line {line}",
|
||||
item.kind
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn metric_samples() -> [RawMetric; 6] {
|
||||
#[test]
|
||||
fn disk_format_upgrade() {
|
||||
let old_samples_json = serde_json::to_value(metric_samples_old()).unwrap();
|
||||
let new_samples =
|
||||
serde_json::to_value(NewMetricsRefRoot::new(metric_samples().as_ref())).unwrap();
|
||||
let upgraded_samples = read_metrics_from_serde_value(old_samples_json).unwrap();
|
||||
let new_samples = read_metrics_from_serde_value(new_samples).unwrap();
|
||||
assert_eq!(upgraded_samples, new_samples);
|
||||
}
|
||||
|
||||
fn metric_samples_old() -> [RawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
|
||||
.unwrap()
|
||||
.into();
|
||||
let [now, before] = [*SAMPLES_NOW, before];
|
||||
|
||||
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
|
||||
}
|
||||
|
||||
fn metric_samples() -> [NewRawMetric; 6] {
|
||||
let tenant_id = TenantId::from_array([0; 16]);
|
||||
let timeline_id = TimelineId::from_array([0xff; 16]);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user