Merge the consumption metric pushes (#9939)

#8564

## Problem

The main and backup consumption metric pushes are completely
independent,
resulting in different event time windows and different idempotency
keys.

## Summary of changes

* Merge the push tasks, but keep chunks the same size.
This commit is contained in:
Folke Behrens
2024-11-30 11:11:37 +01:00
committed by GitHub
parent aa4ec11af9
commit 4abc8e5282
5 changed files with 181 additions and 181 deletions

View File

@@ -112,6 +112,7 @@ workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
fallible-iterator.workspace = true
flate2.workspace = true
tokio-tungstenite.workspace = true
pbkdf2 = { workspace = true, features = ["simple", "std"] }
rcgen.workspace = true

View File

@@ -517,10 +517,6 @@ async fn main() -> anyhow::Result<()> {
if let Some(metrics_config) = &config.metric_collection {
// TODO: Add gc regardles of the metric collection being enabled.
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
client_tasks.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token.clone(),
));
}
if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend {

View File

@@ -1,19 +1,18 @@
//! Periodically collect proxy consumption metrics
//! and push them to a HTTP endpoint.
use std::borrow::Cow;
use std::convert::Infallible;
use std::pin::pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Datelike, Timelike, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use futures::future::select;
use once_cell::sync::Lazy;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use serde::{Deserialize, Serialize};
@@ -23,7 +22,7 @@ use tracing::{error, info, instrument, trace, warn};
use utils::backoff;
use uuid::{NoContext, Timestamp};
use crate::config::{MetricBackupCollectionConfig, MetricCollectionConfig};
use crate::config::MetricCollectionConfig;
use crate::context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD};
use crate::http;
use crate::intern::{BranchIdInt, EndpointIdInt};
@@ -58,55 +57,21 @@ trait MetricCounterReporter {
fn move_metrics(&self) -> (u64, usize);
}
#[derive(Debug)]
struct MetricBackupCounter {
transmitted: AtomicU64,
opened_connections: AtomicUsize,
}
impl MetricCounterRecorder for MetricBackupCounter {
fn record_egress(&self, bytes: u64) {
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
}
fn record_connection(&self, count: usize) {
self.opened_connections.fetch_add(count, Ordering::AcqRel);
}
}
impl MetricCounterReporter for MetricBackupCounter {
fn get_metrics(&mut self) -> (u64, usize) {
(
*self.transmitted.get_mut(),
*self.opened_connections.get_mut(),
)
}
fn move_metrics(&self) -> (u64, usize) {
(
self.transmitted.swap(0, Ordering::AcqRel),
self.opened_connections.swap(0, Ordering::AcqRel),
)
}
}
#[derive(Debug)]
pub(crate) struct MetricCounter {
transmitted: AtomicU64,
opened_connections: AtomicUsize,
backup: Arc<MetricBackupCounter>,
}
impl MetricCounterRecorder for MetricCounter {
/// Record that some bytes were sent from the proxy to the client
fn record_egress(&self, bytes: u64) {
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
self.backup.record_egress(bytes);
self.transmitted.fetch_add(bytes, Ordering::Relaxed);
}
/// Record that some connections were opened
fn record_connection(&self, count: usize) {
self.opened_connections.fetch_add(count, Ordering::AcqRel);
self.backup.record_connection(count);
self.opened_connections.fetch_add(count, Ordering::Relaxed);
}
}
@@ -119,8 +84,8 @@ impl MetricCounterReporter for MetricCounter {
}
fn move_metrics(&self) -> (u64, usize) {
(
self.transmitted.swap(0, Ordering::AcqRel),
self.opened_connections.swap(0, Ordering::AcqRel),
self.transmitted.swap(0, Ordering::Relaxed),
self.opened_connections.swap(0, Ordering::Relaxed),
)
}
}
@@ -173,26 +138,11 @@ type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
#[derive(Default)]
pub(crate) struct Metrics {
endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
}
impl Metrics {
/// Register a new byte metrics counter for this endpoint
pub(crate) fn register(&self, ids: Ids) -> Arc<MetricCounter> {
let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
entry.clone()
} else {
self.backup_endpoints
.entry(ids.clone())
.or_insert_with(|| {
Arc::new(MetricBackupCounter {
transmitted: AtomicU64::new(0),
opened_connections: AtomicUsize::new(0),
})
})
.clone()
};
let entry = if let Some(entry) = self.endpoints.get(&ids) {
entry.clone()
} else {
@@ -202,7 +152,6 @@ impl Metrics {
Arc::new(MetricCounter {
transmitted: AtomicU64::new(0),
opened_connections: AtomicUsize::new(0),
backup: backup.clone(),
})
})
.clone()
@@ -227,6 +176,21 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
);
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
// Even if the remote storage is not configured, we still want to clear the metrics.
let storage = if let Some(config) = config
.backup_metric_collection_config
.remote_storage_config
.as_ref()
{
Some(
GenericRemoteStorage::from_config(config)
.await
.context("remote storage init")?,
)
} else {
None
};
let mut prev = Utc::now();
let mut ticker = tokio::time::interval(config.interval);
loop {
@@ -237,6 +201,8 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
&USAGE_METRICS.endpoints,
&http_client,
&config.endpoint,
storage.as_ref(),
config.backup_metric_collection_config.chunk_size,
&hostname,
prev,
now,
@@ -283,7 +249,6 @@ fn create_event_chunks<'a>(
now: DateTime<Utc>,
chunk_size: usize,
) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
// Split into chunks of 1000 metrics to avoid exceeding the max request size
metrics_to_send
.chunks(chunk_size)
.map(move |chunk| EventChunk {
@@ -303,11 +268,14 @@ fn create_event_chunks<'a>(
})
}
#[expect(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn collect_metrics_iteration(
endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
client: &http::ClientWithMiddleware,
metric_collection_endpoint: &reqwest::Url,
storage: Option<&GenericRemoteStorage>,
outer_chunk_size: usize,
hostname: &str,
prev: DateTime<Utc>,
now: DateTime<Utc>,
@@ -323,17 +291,54 @@ async fn collect_metrics_iteration(
trace!("no new metrics to send");
}
let cancel = CancellationToken::new();
let path_prefix = create_remote_path_prefix(now);
// Send metrics.
for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, outer_chunk_size) {
tokio::join!(
upload_main_events_chunked(client, metric_collection_endpoint, &chunk, CHUNK_SIZE),
async {
if let Err(e) = upload_backup_events(storage, &chunk, &path_prefix, &cancel).await {
error!("failed to upload consumption events to remote storage: {e:?}");
}
}
);
}
}
fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
format!(
"year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z",
year = now.year(),
month = now.month(),
day = now.day(),
hour = now.hour(),
minute = now.minute(),
second = now.second(),
)
}
async fn upload_main_events_chunked(
client: &http::ClientWithMiddleware,
metric_collection_endpoint: &reqwest::Url,
chunk: &EventChunk<'_, Event<Ids, &str>>,
subchunk_size: usize,
) {
// Split into smaller chunks to avoid exceeding the max request size
for subchunk in chunk.events.chunks(subchunk_size).map(|c| EventChunk {
events: Cow::Borrowed(c),
}) {
let res = client
.post(metric_collection_endpoint.clone())
.json(&chunk)
.json(&subchunk)
.send()
.await;
let res = match res {
Ok(x) => x,
Err(err) => {
// TODO: retry?
error!("failed to send metrics: {:?}", err);
continue;
}
@@ -341,7 +346,7 @@ async fn collect_metrics_iteration(
if !res.status().is_success() {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
for metric in subchunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
// Report if the metric value is suspiciously large
warn!("potentially abnormal metric value: {:?}", metric);
}
@@ -349,113 +354,34 @@ async fn collect_metrics_iteration(
}
}
pub async fn task_backup(
backup_config: &MetricBackupCollectionConfig,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
info!("metrics backup config: {backup_config:?}");
scopeguard::defer! {
info!("metrics backup has shut down");
}
// Even if the remote storage is not configured, we still want to clear the metrics.
let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
Some(
GenericRemoteStorage::from_config(config)
.await
.context("remote storage init")?,
)
} else {
None
};
let mut ticker = tokio::time::interval(backup_config.interval);
let mut prev = Utc::now();
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
loop {
select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
let now = Utc::now();
collect_metrics_backup_iteration(
&USAGE_METRICS.backup_endpoints,
storage.as_ref(),
&hostname,
prev,
now,
backup_config.chunk_size,
)
.await;
prev = now;
if cancellation_token.is_cancelled() {
info!("metrics backup has been cancelled");
break;
}
}
Ok(())
}
#[instrument(skip_all)]
async fn collect_metrics_backup_iteration(
endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
async fn upload_backup_events(
storage: Option<&GenericRemoteStorage>,
hostname: &str,
prev: DateTime<Utc>,
now: DateTime<Utc>,
chunk_size: usize,
) {
let year = now.year();
let month = now.month();
let day = now.day();
let hour = now.hour();
let minute = now.minute();
let second = now.second();
let cancel = CancellationToken::new();
info!("starting collect_metrics_backup_iteration");
let metrics_to_send = collect_and_clear_metrics(endpoints);
if metrics_to_send.is_empty() {
trace!("no new metrics to send");
}
// Send metrics.
for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
let real_now = Utc::now();
let id = uuid::Uuid::new_v7(Timestamp::from_unix(
NoContext,
real_now.second().into(),
real_now.nanosecond(),
));
let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
let remote_path = match RemotePath::from_string(&path) {
Ok(remote_path) => remote_path,
Err(e) => {
error!("failed to create remote path from str {path}: {:?}", e);
continue;
}
};
let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
if let Err(e) = res {
error!(
"failed to upload consumption events to remote storage: {:?}",
e
);
}
}
}
async fn upload_events_chunk(
storage: Option<&GenericRemoteStorage>,
chunk: EventChunk<'_, Event<Ids, &'static str>>,
remote_path: &RemotePath,
chunk: &EventChunk<'_, Event<Ids, &'static str>>,
path_prefix: &str,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let Some(storage) = storage else {
error!("no remote storage configured");
warn!("no remote storage configured");
return Ok(());
};
let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
let real_now = Utc::now();
let id = uuid::Uuid::new_v7(Timestamp::from_unix(
NoContext,
real_now.second().into(),
real_now.nanosecond(),
));
let path = format!("{path_prefix}_{id}.json.gz");
let remote_path = match RemotePath::from_string(&path) {
Ok(remote_path) => remote_path,
Err(e) => {
bail!("failed to create remote path from str {path}: {:?}", e);
}
};
// TODO: This is async compression from Vec to Vec. Rewrite as byte stream.
// Use sync compression in blocking threadpool.
let data = serde_json::to_vec(chunk).context("serialize metrics")?;
let mut encoder = GzipEncoder::new(Vec::new());
encoder.write_all(&data).await.context("compress metrics")?;
encoder.shutdown().await.context("compress metrics")?;
@@ -464,7 +390,7 @@ async fn upload_events_chunk(
|| async {
let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
storage
.upload(stream, compressed_data.len(), remote_path, None, cancel)
.upload(stream, compressed_data.len(), &remote_path, None, cancel)
.await
},
TimeoutOrCancel::caused_by_cancel,
@@ -482,9 +408,12 @@ async fn upload_events_chunk(
#[cfg(test)]
mod tests {
use std::fs;
use std::io::BufReader;
use std::sync::{Arc, Mutex};
use anyhow::Error;
use camino_tempfile::tempdir;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
use http_body_util::BodyExt;
@@ -493,6 +422,7 @@ mod tests {
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use tokio::net::TcpListener;
use url::Url;
@@ -538,8 +468,34 @@ mod tests {
let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
let now = Utc::now();
let storage_test_dir = tempdir().unwrap();
let local_fs_path = storage_test_dir.path().join("usage_metrics");
fs::create_dir_all(&local_fs_path).unwrap();
let storage = GenericRemoteStorage::from_config(&RemoteStorageConfig {
storage: RemoteStorageKind::LocalFs {
local_path: local_fs_path.clone(),
},
timeout: Duration::from_secs(10),
small_timeout: Duration::from_secs(1),
})
.await
.unwrap();
let mut pushed_chunks: Vec<Report> = Vec::new();
let mut stored_chunks: Vec<Report> = Vec::new();
// no counters have been registered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
collect_metrics_iteration(
&metrics.endpoints,
&client,
&endpoint,
Some(&storage),
1000,
"foo",
now,
now,
)
.await;
let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
@@ -551,39 +507,84 @@ mod tests {
});
// the counter should be observed despite 0 egress
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
collect_metrics_iteration(
&metrics.endpoints,
&client,
&endpoint,
Some(&storage),
1000,
"foo",
now,
now,
)
.await;
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 0);
pushed_chunks.extend(r);
// record egress
counter.record_egress(1);
// egress should be observered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
collect_metrics_iteration(
&metrics.endpoints,
&client,
&endpoint,
Some(&storage),
1000,
"foo",
now,
now,
)
.await;
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 1);
pushed_chunks.extend(r);
// release counter
drop(counter);
// we do not observe the counter
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
collect_metrics_iteration(
&metrics.endpoints,
&client,
&endpoint,
Some(&storage),
1000,
"foo",
now,
now,
)
.await;
let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// counter is unregistered
assert!(metrics.endpoints.is_empty());
collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000)
.await;
assert!(!metrics.backup_endpoints.is_empty());
collect_metrics_backup_iteration(&metrics.backup_endpoints, None, "foo", now, now, 1000)
.await;
// backup counter is unregistered after the second iteration
assert!(metrics.backup_endpoints.is_empty());
let path_prefix = create_remote_path_prefix(now);
for entry in walkdir::WalkDir::new(&local_fs_path)
.into_iter()
.filter_map(|e| e.ok())
{
let path = local_fs_path.join(&path_prefix).to_string();
if entry.path().to_str().unwrap().starts_with(&path) {
let chunk = serde_json::from_reader(flate2::bufread::GzDecoder::new(
BufReader::new(fs::File::open(entry.into_path()).unwrap()),
))
.unwrap();
stored_chunks.push(chunk);
}
}
storage_test_dir.close().ok();
// sort by first event's idempotency key because the order of files is nondeterministic
pushed_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
stored_chunks.sort_by_cached_key(|c| c.events[0].idempotency_key.clone());
assert_eq!(pushed_chunks, stored_chunks);
}
}