From 4abc8e5282037c85a922ae113e0677c50841b309 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Sat, 30 Nov 2024 11:11:37 +0100 Subject: [PATCH] Merge the consumption metric pushes (#9939) #8564 ## Problem The main and backup consumption metric pushes are completely independent, resulting in different event time windows and different idempotency keys. ## Summary of changes * Merge the push tasks, but keep chunks the same size. --- Cargo.lock | 1 + libs/consumption_metrics/src/lib.rs | 5 +- proxy/Cargo.toml | 1 + proxy/src/bin/proxy.rs | 4 - proxy/src/usage_metrics.rs | 351 ++++++++++++++-------------- 5 files changed, 181 insertions(+), 181 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 313222cf3c..5ce27a7d45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4501,6 +4501,7 @@ dependencies = [ "ecdsa 0.16.9", "env_logger", "fallible-iterator", + "flate2", "framed-websockets", "futures", "hashbrown 0.14.5", diff --git a/libs/consumption_metrics/src/lib.rs b/libs/consumption_metrics/src/lib.rs index fbe2e6830f..448134f31a 100644 --- a/libs/consumption_metrics/src/lib.rs +++ b/libs/consumption_metrics/src/lib.rs @@ -103,11 +103,12 @@ impl<'a> IdempotencyKey<'a> { } } +/// Split into chunks of 1000 metrics to avoid exceeding the max request size. pub const CHUNK_SIZE: usize = 1000; // Just a wrapper around a slice of events // to serialize it as `{"events" : [ ] } -#[derive(serde::Serialize, Deserialize)] -pub struct EventChunk<'a, T: Clone> { +#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq)] +pub struct EventChunk<'a, T: Clone + PartialEq> { pub events: std::borrow::Cow<'a, [T]>, } diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 0d774d529d..f5934c8a89 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -112,6 +112,7 @@ workspace_hack.workspace = true [dev-dependencies] camino-tempfile.workspace = true fallible-iterator.workspace = true +flate2.workspace = true tokio-tungstenite.workspace = true pbkdf2 = { workspace = true, features = ["simple", "std"] } rcgen.workspace = true diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index a935378162..b772a987ee 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -517,10 +517,6 @@ async fn main() -> anyhow::Result<()> { if let Some(metrics_config) = &config.metric_collection { // TODO: Add gc regardles of the metric collection being enabled. maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); - client_tasks.spawn(usage_metrics::task_backup( - &metrics_config.backup_metric_collection_config, - cancellation_token.clone(), - )); } if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend { diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index c5e8588623..65e74466f2 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -1,19 +1,18 @@ //! Periodically collect proxy consumption metrics //! and push them to a HTTP endpoint. +use std::borrow::Cow; use std::convert::Infallible; -use std::pin::pin; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use anyhow::Context; +use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; use dashmap::mapref::entry::Entry; use dashmap::DashMap; -use futures::future::select; use once_cell::sync::Lazy; use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use serde::{Deserialize, Serialize}; @@ -23,7 +22,7 @@ use tracing::{error, info, instrument, trace, warn}; use utils::backoff; use uuid::{NoContext, Timestamp}; -use crate::config::{MetricBackupCollectionConfig, MetricCollectionConfig}; +use crate::config::MetricCollectionConfig; use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}; use crate::http; use crate::intern::{BranchIdInt, EndpointIdInt}; @@ -58,55 +57,21 @@ trait MetricCounterReporter { fn move_metrics(&self) -> (u64, usize); } -#[derive(Debug)] -struct MetricBackupCounter { - transmitted: AtomicU64, - opened_connections: AtomicUsize, -} - -impl MetricCounterRecorder for MetricBackupCounter { - fn record_egress(&self, bytes: u64) { - self.transmitted.fetch_add(bytes, Ordering::AcqRel); - } - - fn record_connection(&self, count: usize) { - self.opened_connections.fetch_add(count, Ordering::AcqRel); - } -} - -impl MetricCounterReporter for MetricBackupCounter { - fn get_metrics(&mut self) -> (u64, usize) { - ( - *self.transmitted.get_mut(), - *self.opened_connections.get_mut(), - ) - } - fn move_metrics(&self) -> (u64, usize) { - ( - self.transmitted.swap(0, Ordering::AcqRel), - self.opened_connections.swap(0, Ordering::AcqRel), - ) - } -} - #[derive(Debug)] pub(crate) struct MetricCounter { transmitted: AtomicU64, opened_connections: AtomicUsize, - backup: Arc, } impl MetricCounterRecorder for MetricCounter { /// Record that some bytes were sent from the proxy to the client fn record_egress(&self, bytes: u64) { - self.transmitted.fetch_add(bytes, Ordering::AcqRel); - self.backup.record_egress(bytes); + self.transmitted.fetch_add(bytes, Ordering::Relaxed); } /// Record that some connections were opened fn record_connection(&self, count: usize) { - self.opened_connections.fetch_add(count, Ordering::AcqRel); - self.backup.record_connection(count); + self.opened_connections.fetch_add(count, Ordering::Relaxed); } } @@ -119,8 +84,8 @@ impl MetricCounterReporter for MetricCounter { } fn move_metrics(&self) -> (u64, usize) { ( - self.transmitted.swap(0, Ordering::AcqRel), - self.opened_connections.swap(0, Ordering::AcqRel), + self.transmitted.swap(0, Ordering::Relaxed), + self.opened_connections.swap(0, Ordering::Relaxed), ) } } @@ -173,26 +138,11 @@ type FastHasher = std::hash::BuildHasherDefault; #[derive(Default)] pub(crate) struct Metrics { endpoints: DashMap, FastHasher>, - backup_endpoints: DashMap, FastHasher>, } impl Metrics { /// Register a new byte metrics counter for this endpoint pub(crate) fn register(&self, ids: Ids) -> Arc { - let backup = if let Some(entry) = self.backup_endpoints.get(&ids) { - entry.clone() - } else { - self.backup_endpoints - .entry(ids.clone()) - .or_insert_with(|| { - Arc::new(MetricBackupCounter { - transmitted: AtomicU64::new(0), - opened_connections: AtomicUsize::new(0), - }) - }) - .clone() - }; - let entry = if let Some(entry) = self.endpoints.get(&ids) { entry.clone() } else { @@ -202,7 +152,6 @@ impl Metrics { Arc::new(MetricCounter { transmitted: AtomicU64::new(0), opened_connections: AtomicUsize::new(0), - backup: backup.clone(), }) }) .clone() @@ -227,6 +176,21 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result anyhow::Result( now: DateTime, chunk_size: usize, ) -> impl Iterator>> + 'a { - // Split into chunks of 1000 metrics to avoid exceeding the max request size metrics_to_send .chunks(chunk_size) .map(move |chunk| EventChunk { @@ -303,11 +268,14 @@ fn create_event_chunks<'a>( }) } +#[expect(clippy::too_many_arguments)] #[instrument(skip_all)] async fn collect_metrics_iteration( endpoints: &DashMap, FastHasher>, client: &http::ClientWithMiddleware, metric_collection_endpoint: &reqwest::Url, + storage: Option<&GenericRemoteStorage>, + outer_chunk_size: usize, hostname: &str, prev: DateTime, now: DateTime, @@ -323,17 +291,54 @@ async fn collect_metrics_iteration( trace!("no new metrics to send"); } + let cancel = CancellationToken::new(); + let path_prefix = create_remote_path_prefix(now); + // Send metrics. - for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) { + for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) { + tokio::join!( + upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE), + async { + if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await { + error!("failed to upload consumption events to remote storage: {e:?}"); + } + } + ); + } +} + +fn create_remote_path_prefix(now: DateTime) -> String { + format!( + "year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z", + year = now.year(), + month = now.month(), + day = now.day(), + hour = now.hour(), + minute = now.minute(), + second = now.second(), + ) +} + +async fn upload_main_events_chunked( + client: &http::ClientWithMiddleware, + metric_collection_endpoint: &reqwest::Url, + chunk: &EventChunk<'_, Event>, + subchunk_size: usize, +) { + // Split into smaller chunks to avoid exceeding the max request size + for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk { + events: Cow::Borrowed(c), + }) { let res = client .post(metric_collection_endpoint.clone()) - .json(&chunk) + .json(&subchunk) .send() .await; let res = match res { Ok(x) => x, Err(err) => { + // TODO: retry? error!("failed to send metrics: {:?}", err); continue; } @@ -341,7 +346,7 @@ async fn collect_metrics_iteration( if !res.status().is_success() { error!("metrics endpoint refused the sent metrics: {:?}", res); - for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) { + for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) { // Report if the metric value is suspiciously large warn!("potentially abnormal metric value: {:?}", metric); } @@ -349,113 +354,34 @@ async fn collect_metrics_iteration( } } -pub async fn task_backup( - backup_config: &MetricBackupCollectionConfig, - cancellation_token: CancellationToken, -) -> anyhow::Result<()> { - info!("metrics backup config: {backup_config:?}"); - scopeguard::defer! { - info!("metrics backup has shut down"); - } - // Even if the remote storage is not configured, we still want to clear the metrics. - let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() { - Some( - GenericRemoteStorage::from_config(config) - .await - .context("remote storage init")?, - ) - } else { - None - }; - let mut ticker = tokio::time::interval(backup_config.interval); - let mut prev = Utc::now(); - let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned(); - loop { - select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await; - let now = Utc::now(); - collect_metrics_backup_iteration( - &USAGE_METRICS.backup_endpoints, - storage.as_ref(), - &hostname, - prev, - now, - backup_config.chunk_size, - ) - .await; - - prev = now; - if cancellation_token.is_cancelled() { - info!("metrics backup has been cancelled"); - break; - } - } - Ok(()) -} - -#[instrument(skip_all)] -async fn collect_metrics_backup_iteration( - endpoints: &DashMap, FastHasher>, +async fn upload_backup_events( storage: Option<&GenericRemoteStorage>, - hostname: &str, - prev: DateTime, - now: DateTime, - chunk_size: usize, -) { - let year = now.year(); - let month = now.month(); - let day = now.day(); - let hour = now.hour(); - let minute = now.minute(); - let second = now.second(); - let cancel = CancellationToken::new(); - - info!("starting collect_metrics_backup_iteration"); - - let metrics_to_send = collect_and_clear_metrics(endpoints); - - if metrics_to_send.is_empty() { - trace!("no new metrics to send"); - } - - // Send metrics. - for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) { - let real_now = Utc::now(); - let id = uuid::Uuid::new_v7(Timestamp::from_unix( - NoContext, - real_now.second().into(), - real_now.nanosecond(), - )); - let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz"); - let remote_path = match RemotePath::from_string(&path) { - Ok(remote_path) => remote_path, - Err(e) => { - error!("failed to create remote path from str {path}: {:?}", e); - continue; - } - }; - - let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await; - - if let Err(e) = res { - error!( - "failed to upload consumption events to remote storage: {:?}", - e - ); - } - } -} - -async fn upload_events_chunk( - storage: Option<&GenericRemoteStorage>, - chunk: EventChunk<'_, Event>, - remote_path: &RemotePath, + chunk: &EventChunk<'_, Event>, + path_prefix: &str, cancel: &CancellationToken, ) -> anyhow::Result<()> { let Some(storage) = storage else { - error!("no remote storage configured"); + warn!("no remote storage configured"); return Ok(()); }; - let data = serde_json::to_vec(&chunk).context("serialize metrics")?; + + let real_now = Utc::now(); + let id = uuid::Uuid::new_v7(Timestamp::from_unix( + NoContext, + real_now.second().into(), + real_now.nanosecond(), + )); + let path = format!("{path_prefix}_{id}.json.gz"); + let remote_path = match RemotePath::from_string(&path) { + Ok(remote_path) => remote_path, + Err(e) => { + bail!("failed to create remote path from str {path}: {:?}", e); + } + }; + + // TODO: This is async compression from Vec to Vec. Rewrite as byte stream. + // Use sync compression in blocking threadpool. + let data = serde_json::to_vec(chunk).context("serialize metrics")?; let mut encoder = GzipEncoder::new(Vec::new()); encoder.write_all(&data).await.context("compress metrics")?; encoder.shutdown().await.context("compress metrics")?; @@ -464,7 +390,7 @@ async fn upload_events_chunk( || async { let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone()))); storage - .upload(stream, compressed_data.len(), remote_path, None, cancel) + .upload(stream, compressed_data.len(), &remote_path, None, cancel) .await }, TimeoutOrCancel::caused_by_cancel, @@ -482,9 +408,12 @@ async fn upload_events_chunk( #[cfg(test)] mod tests { + use std::fs; + use std::io::BufReader; use std::sync::{Arc, Mutex}; use anyhow::Error; + use camino_tempfile::tempdir; use chrono::Utc; use consumption_metrics::{Event, EventChunk}; use http_body_util::BodyExt; @@ -493,6 +422,7 @@ mod tests { use hyper::service::service_fn; use hyper::{Request, Response}; use hyper_util::rt::TokioIo; + use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; use tokio::net::TcpListener; use url::Url; @@ -538,8 +468,34 @@ mod tests { let endpoint = Url::parse(&format!("http://{addr}")).unwrap(); let now = Utc::now(); + let storage_test_dir = tempdir().unwrap(); + let local_fs_path = storage_test_dir.path().join("usage_metrics"); + fs::create_dir_all(&local_fs_path).unwrap(); + let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig { + storage: RemoteStorageKind::LocalFs { + local_path: local_fs_path.clone(), + }, + timeout: Duration::from_secs(10), + small_timeout: Duration::from_secs(1), + }) + .await + .unwrap(); + + let mut pushed_chunks: Vec = Vec::new(); + let mut stored_chunks: Vec = Vec::new(); + // no counters have been registered - collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration( + &metrics.endpoints, + &client, + &endpoint, + Some(&storage), + 1000, + "foo", + now, + now, + ) + .await; let r = std::mem::take(&mut *reports.lock().unwrap()); assert!(r.is_empty()); @@ -551,39 +507,84 @@ mod tests { }); // the counter should be observed despite 0 egress - collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration( + &metrics.endpoints, + &client, + &endpoint, + Some(&storage), + 1000, + "foo", + now, + now, + ) + .await; let r = std::mem::take(&mut *reports.lock().unwrap()); assert_eq!(r.len(), 1); assert_eq!(r[0].events.len(), 1); assert_eq!(r[0].events[0].value, 0); + pushed_chunks.extend(r); // record egress counter.record_egress(1); // egress should be observered - collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration( + &metrics.endpoints, + &client, + &endpoint, + Some(&storage), + 1000, + "foo", + now, + now, + ) + .await; let r = std::mem::take(&mut *reports.lock().unwrap()); assert_eq!(r.len(), 1); assert_eq!(r[0].events.len(), 1); assert_eq!(r[0].events[0].value, 1); + pushed_chunks.extend(r); // release counter drop(counter); // we do not observe the counter - collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await; + collect_metrics_iteration( + &metrics.endpoints, + &client, + &endpoint, + Some(&storage), + 1000, + "foo", + now, + now, + ) + .await; let r = std::mem::take(&mut *reports.lock().unwrap()); assert!(r.is_empty()); // counter is unregistered assert!(metrics.endpoints.is_empty()); - collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000) - .await; - assert!(!metrics.backup_endpoints.is_empty()); - collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000) - .await; - // backup counter is unregistered after the second iteration - assert!(metrics.backup_endpoints.is_empty()); + let path_prefix = create_remote_path_prefix(now); + for entry in walkdir::WalkDir::new(&local_fs_path) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = local_fs_path.join(&path_prefix).to_string(); + if entry.path().to_str().unwrap().starts_with(&path) { + let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new( + BufReader::new(fs::File::open(entry.into_path()).unwrap()), + )) + .unwrap(); + stored_chunks.push(chunk); + } + } + storage_test_dir.close().ok(); + + // sort by first event's idempotency key because the order of files is nondeterministic + pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone()); + stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone()); + assert_eq!(pushed_chunks, stored_chunks); } }