From b37da32c6f56f31f39661c9364a7a662df59dbbc Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 3 Sep 2024 10:05:24 +0200 Subject: [PATCH] pageserver: reuse idempotency keys across metrics sinks (#8876) ## Problem Metrics event idempotency keys differ across S3 and Vector. The events should be identical. Resolves #8605. ## Summary of changes Pre-generate the idempotency keys and pass the same set into both metrics sinks. Co-authored-by: John Spray --- pageserver/src/consumption_metrics.rs | 24 +++++++-- pageserver/src/consumption_metrics/upload.rs | 52 +++++++++++--------- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index f94d945d46..64a267e0e4 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -1,6 +1,8 @@ //! Periodically collect consumption metrics for all active tenants //! and push them to a HTTP endpoint. use crate::config::PageServerConf; +use crate::consumption_metrics::metrics::MetricsKey; +use crate::consumption_metrics::upload::KeyGen as _; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::size::CalculateSyntheticSizeError; @@ -8,6 +10,7 @@ use crate::tenant::tasks::BackgroundLoopKind; use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant}; use camino::Utf8PathBuf; use consumption_metrics::EventType; +use itertools::Itertools as _; use pageserver_api::models::TenantState; use remote_storage::{GenericRemoteStorage, RemoteStorageConfig}; use reqwest::Url; @@ -19,9 +22,8 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::NodeId; -mod metrics; -use crate::consumption_metrics::metrics::MetricsKey; mod disk_cache; +mod metrics; mod upload; const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); @@ -143,6 +145,12 @@ async fn collect_metrics( // these are point in time, with variable "now" let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await; + // Pre-generate event idempotency keys, to reuse them across the bucket + // and HTTP sinks. + let idempotency_keys = std::iter::repeat_with(|| node_id.as_str().generate()) + .take(metrics.len()) + .collect_vec(); + let metrics = Arc::new(metrics); // why not race cancellation here? because we are one of the last tasks, and if we are @@ -161,8 +169,14 @@ async fn collect_metrics( } if let Some(bucket_client) = &bucket_client { - let res = - upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await; + let res = upload::upload_metrics_bucket( + bucket_client, + &cancel, + &node_id, + &metrics, + &idempotency_keys, + ) + .await; if let Err(e) = res { tracing::error!("failed to upload to S3: {e:#}"); } @@ -174,9 +188,9 @@ async fn collect_metrics( &client, metric_collection_endpoint, &cancel, - &node_id, &metrics, &mut cached_metrics, + &idempotency_keys, ) .await; if let Err(e) = res { diff --git a/pageserver/src/consumption_metrics/upload.rs b/pageserver/src/consumption_metrics/upload.rs index 4e8283c3e4..0325ee403a 100644 --- a/pageserver/src/consumption_metrics/upload.rs +++ b/pageserver/src/consumption_metrics/upload.rs @@ -24,16 +24,16 @@ pub(super) async fn upload_metrics_http( client: &reqwest::Client, metric_collection_endpoint: &reqwest::Url, cancel: &CancellationToken, - node_id: &str, metrics: &[RawMetric], cached_metrics: &mut Cache, + idempotency_keys: &[IdempotencyKey<'_>], ) -> anyhow::Result<()> { let mut uploaded = 0; let mut failed = 0; let started_at = std::time::Instant::now(); - let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id); + let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys); while let Some(res) = iter.next() { let (chunk, body) = res?; @@ -87,6 +87,7 @@ pub(super) async fn upload_metrics_bucket( cancel: &CancellationToken, node_id: &str, metrics: &[RawMetric], + idempotency_keys: &[IdempotencyKey<'_>], ) -> anyhow::Result<()> { if metrics.is_empty() { // Skip uploads if we have no metrics, so that readers don't have to handle the edge case @@ -106,7 +107,7 @@ pub(super) async fn upload_metrics_bucket( // Serialize and write into compressed buffer let started_at = std::time::Instant::now(); - for res in serialize_in_chunks(CHUNK_SIZE, metrics, node_id) { + for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) { let (_chunk, body) = res?; gzip_writer.write_all(&body).await?; } @@ -134,29 +135,31 @@ pub(super) async fn upload_metrics_bucket( Ok(()) } -// The return type is quite ugly, but we gain testability in isolation -fn serialize_in_chunks<'a, F>( +/// Serializes the input metrics as JSON in chunks of chunk_size. The provided +/// idempotency keys are injected into the corresponding metric events (reused +/// across different metrics sinks), and must have the same length as input. +fn serialize_in_chunks<'a>( chunk_size: usize, input: &'a [RawMetric], - factory: F, + idempotency_keys: &'a [IdempotencyKey<'a>], ) -> impl ExactSizeIterator> + 'a -where - F: KeyGen<'a> + 'a, { use bytes::BufMut; - struct Iter<'a, F> { + assert_eq!(input.len(), idempotency_keys.len()); + + struct Iter<'a> { inner: std::slice::Chunks<'a, RawMetric>, + idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>, chunk_size: usize, // write to a BytesMut so that we can cheaply clone the frozen Bytes for retries buffer: bytes::BytesMut, // chunk amount of events are reused to produce the serialized document scratch: Vec>, - factory: F, } - impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> { + impl<'a> Iterator for Iter<'a> { type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>; fn next(&mut self) -> Option { @@ -167,17 +170,14 @@ where self.scratch.extend( chunk .iter() - .map(|raw_metric| raw_metric.as_event(&self.factory.generate())), + .zip(&mut self.idempotency_keys) + .map(|(raw_metric, key)| raw_metric.as_event(key)), ); } else { // next rounds: update_in_place to reuse allocations assert_eq!(self.scratch.len(), self.chunk_size); - self.scratch - .iter_mut() - .zip(chunk.iter()) - .for_each(|(slot, raw_metric)| { - raw_metric.update_in_place(slot, &self.factory.generate()) - }); + itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys) + .for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key)); } let res = serde_json::to_writer( @@ -198,18 +198,19 @@ where } } - impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {} + impl<'a> ExactSizeIterator for Iter<'a> {} let buffer = bytes::BytesMut::new(); let inner = input.chunks(chunk_size); + let idempotency_keys = idempotency_keys.iter(); let scratch = Vec::new(); Iter { inner, + idempotency_keys, chunk_size, buffer, scratch, - factory, } } @@ -268,7 +269,7 @@ impl RawMetricExt for RawMetric { } } -trait KeyGen<'a>: Copy { +pub(crate) trait KeyGen<'a> { fn generate(&self) -> IdempotencyKey<'a>; } @@ -389,7 +390,10 @@ mod tests { let examples = metric_samples(); assert!(examples.len() > 1); - let factory = FixedGen::new(Utc::now(), "1", 42); + let now = Utc::now(); + let idempotency_keys = (0..examples.len()) + .map(|i| FixedGen::new(now, "1", i as u16).generate()) + .collect::>(); // need to use Event here because serde_json::Value uses default hashmap, not linked // hashmap @@ -398,13 +402,13 @@ mod tests { events: Vec>, } - let correct = serialize_in_chunks(examples.len(), &examples, factory) + let correct = serialize_in_chunks(examples.len(), &examples, &idempotency_keys) .map(|res| res.unwrap().1) .flat_map(|body| serde_json::from_slice::(&body).unwrap().events) .collect::>(); for chunk_size in 1..examples.len() { - let actual = serialize_in_chunks(chunk_size, &examples, factory) + let actual = serialize_in_chunks(chunk_size, &examples, &idempotency_keys) .map(|res| res.unwrap().1) .flat_map(|body| serde_json::from_slice::(&body).unwrap().events) .collect::>();