mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
proxy: upload consumption events to S3 (#7213)
## Problem If vector is unavailable, we are missing consumption events. https://github.com/neondatabase/cloud/issues/9826 ## Summary of changes Added integration with the consumption bucket.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4199,6 +4199,7 @@ name = "proxy"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"async-trait",
|
||||
"aws-config",
|
||||
"aws-sdk-iam",
|
||||
|
||||
@@ -10,6 +10,7 @@ testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-trait.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-iam.workspace = true
|
||||
|
||||
@@ -10,6 +10,7 @@ use proxy::auth;
|
||||
use proxy::auth::backend::MaybeOwned;
|
||||
use proxy::cancellation::CancelMap;
|
||||
use proxy::cancellation::CancellationHandler;
|
||||
use proxy::config::remote_storage_from_toml;
|
||||
use proxy::config::AuthenticationConfig;
|
||||
use proxy::config::CacheOptions;
|
||||
use proxy::config::HttpConfig;
|
||||
@@ -191,6 +192,19 @@ struct ProxyCliArgs {
|
||||
|
||||
#[clap(flatten)]
|
||||
parquet_upload: ParquetUploadArgs,
|
||||
|
||||
/// interval for backup metric collection
|
||||
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
|
||||
metric_backup_collection_interval: std::time::Duration,
|
||||
/// remote storage configuration for backup metric collection
|
||||
/// Encoded as toml (same format as pageservers), eg
|
||||
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
|
||||
#[clap(long, default_value = "{}")]
|
||||
metric_backup_collection_remote_storage: String,
|
||||
/// chunk size for backup metric collection
|
||||
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
|
||||
#[clap(long, default_value = "4194304")]
|
||||
metric_backup_collection_chunk_size: usize,
|
||||
}
|
||||
|
||||
#[derive(clap::Args, Clone, Copy, Debug)]
|
||||
@@ -372,12 +386,17 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
// maintenance tasks. these never return unless there's an error
|
||||
let mut maintenance_tasks = JoinSet::new();
|
||||
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token));
|
||||
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone()));
|
||||
maintenance_tasks.spawn(http::health_server::task_main(http_listener));
|
||||
maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener));
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
// TODO: Add gc regardles of the metric collection being enabled.
|
||||
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
|
||||
client_tasks.spawn(usage_metrics::task_backup(
|
||||
&metrics_config.backup_metric_collection_config,
|
||||
cancellation_token,
|
||||
));
|
||||
}
|
||||
|
||||
if let auth::BackendType::Console(api, _) = &config.auth_backend {
|
||||
@@ -434,6 +453,13 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
if args.allow_self_signed_compute {
|
||||
warn!("allowing self-signed compute certificates");
|
||||
}
|
||||
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
|
||||
interval: args.metric_backup_collection_interval,
|
||||
remote_storage_config: remote_storage_from_toml(
|
||||
&args.metric_backup_collection_remote_storage,
|
||||
)?,
|
||||
chunk_size: args.metric_backup_collection_chunk_size,
|
||||
};
|
||||
|
||||
let metric_collection = match (
|
||||
&args.metric_collection_endpoint,
|
||||
@@ -442,6 +468,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
(Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig {
|
||||
endpoint: endpoint.parse()?,
|
||||
interval: humantime::parse_duration(interval)?,
|
||||
backup_metric_collection_config,
|
||||
}),
|
||||
(None, None) => None,
|
||||
_ => bail!(
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::{
|
||||
};
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use itertools::Itertools;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use rustls::{
|
||||
crypto::ring::sign,
|
||||
pki_types::{CertificateDer, PrivateKeyDer},
|
||||
@@ -39,6 +40,7 @@ pub struct ProxyConfig {
|
||||
pub struct MetricCollectionConfig {
|
||||
pub endpoint: reqwest::Url,
|
||||
pub interval: Duration,
|
||||
pub backup_metric_collection_config: MetricBackupCollectionConfig,
|
||||
}
|
||||
|
||||
pub struct TlsConfig {
|
||||
@@ -311,6 +313,21 @@ impl CertResolver {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricBackupCollectionConfig {
|
||||
pub interval: Duration,
|
||||
pub remote_storage_config: OptRemoteStorageConfig,
|
||||
pub chunk_size: usize,
|
||||
}
|
||||
|
||||
/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
|
||||
/// runtime type errors from the value parser we use.
|
||||
pub type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
|
||||
|
||||
pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
|
||||
RemoteStorageConfig::from_toml(&s.parse()?)
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[derive(Debug)]
|
||||
pub struct CacheOptions {
|
||||
|
||||
@@ -13,12 +13,14 @@ use parquet::{
|
||||
},
|
||||
record::RecordWriter,
|
||||
};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use tokio::{sync::mpsc, time};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, Span};
|
||||
use utils::backoff;
|
||||
|
||||
use crate::config::{remote_storage_from_toml, OptRemoteStorageConfig};
|
||||
|
||||
use super::{RequestMonitoring, LOG_CHAN};
|
||||
|
||||
#[derive(clap::Args, Clone, Debug)]
|
||||
@@ -50,21 +52,13 @@ pub struct ParquetUploadArgs {
|
||||
parquet_upload_compression: Compression,
|
||||
}
|
||||
|
||||
/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
|
||||
/// runtime type errors from the value parser we use.
|
||||
type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
|
||||
|
||||
fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
|
||||
RemoteStorageConfig::from_toml(&s.parse()?)
|
||||
}
|
||||
|
||||
// Occasional network issues and such can cause remote operations to fail, and
|
||||
// that's expected. If a upload fails, we log it at info-level, and retry.
|
||||
// But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
|
||||
// level instead, as repeated failures can mean a more serious problem. If it
|
||||
// fails more than FAILED_UPLOAD_RETRIES times, we give up
|
||||
pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
|
||||
pub const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
pub const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
|
||||
|
||||
// the parquet crate leaves a lot to be desired...
|
||||
// what follows is an attempt to write parquet files with minimal allocs.
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
console::messages::MetricsAuxInfo,
|
||||
metrics::NUM_BYTES_PROXIED_COUNTER,
|
||||
stream::Stream,
|
||||
usage_metrics::{Ids, USAGE_METRICS},
|
||||
usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS},
|
||||
};
|
||||
use metrics::IntCounterPairGuard;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
@@ -44,6 +44,7 @@ use crate::metrics::HTTP_CONTENT_LENGTH;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::usage_metrics::MetricCounterRecorder;
|
||||
use crate::DbName;
|
||||
use crate::RoleName;
|
||||
|
||||
|
||||
@@ -1,20 +1,34 @@
|
||||
//! Periodically collect proxy consumption metrics
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::{config::MetricCollectionConfig, http, BranchId, EndpointId};
|
||||
use chrono::{DateTime, Utc};
|
||||
use crate::{
|
||||
config::{MetricBackupCollectionConfig, MetricCollectionConfig},
|
||||
context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
|
||||
http, BranchId, EndpointId,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Datelike, Timelike, Utc};
|
||||
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
|
||||
use dashmap::{mapref::entry::Entry, DashMap};
|
||||
use futures::future::select;
|
||||
use once_cell::sync::Lazy;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
pin::pin,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, instrument, trace};
|
||||
use utils::backoff;
|
||||
use uuid::{NoContext, Timestamp};
|
||||
|
||||
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
|
||||
|
||||
@@ -33,19 +47,93 @@ pub struct Ids {
|
||||
pub branch_id: BranchId,
|
||||
}
|
||||
|
||||
pub trait MetricCounterRecorder {
|
||||
/// Record that some bytes were sent from the proxy to the client
|
||||
fn record_egress(&self, bytes: u64);
|
||||
/// Record that some connections were opened
|
||||
fn record_connection(&self, count: usize);
|
||||
}
|
||||
|
||||
trait MetricCounterReporter {
|
||||
fn get_metrics(&mut self) -> (u64, usize);
|
||||
fn move_metrics(&self) -> (u64, usize);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricCounter {
|
||||
struct MetricBackupCounter {
|
||||
transmitted: AtomicU64,
|
||||
opened_connections: AtomicUsize,
|
||||
}
|
||||
|
||||
impl MetricCounter {
|
||||
/// Record that some bytes were sent from the proxy to the client
|
||||
pub fn record_egress(&self, bytes: u64) {
|
||||
impl MetricCounterRecorder for MetricBackupCounter {
|
||||
fn record_egress(&self, bytes: u64) {
|
||||
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
|
||||
}
|
||||
|
||||
fn record_connection(&self, count: usize) {
|
||||
self.opened_connections.fetch_add(count, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricCounterReporter for MetricBackupCounter {
|
||||
fn get_metrics(&mut self) -> (u64, usize) {
|
||||
(
|
||||
*self.transmitted.get_mut(),
|
||||
*self.opened_connections.get_mut(),
|
||||
)
|
||||
}
|
||||
fn move_metrics(&self) -> (u64, usize) {
|
||||
(
|
||||
self.transmitted.swap(0, Ordering::AcqRel),
|
||||
self.opened_connections.swap(0, Ordering::AcqRel),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricCounter {
|
||||
transmitted: AtomicU64,
|
||||
opened_connections: AtomicUsize,
|
||||
backup: Arc<MetricBackupCounter>,
|
||||
}
|
||||
|
||||
impl MetricCounterRecorder for MetricCounter {
|
||||
/// Record that some bytes were sent from the proxy to the client
|
||||
fn record_egress(&self, bytes: u64) {
|
||||
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
|
||||
self.backup.record_egress(bytes);
|
||||
}
|
||||
|
||||
/// Record that some connections were opened
|
||||
fn record_connection(&self, count: usize) {
|
||||
self.opened_connections.fetch_add(count, Ordering::AcqRel);
|
||||
self.backup.record_connection(count);
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricCounterReporter for MetricCounter {
|
||||
fn get_metrics(&mut self) -> (u64, usize) {
|
||||
(
|
||||
*self.transmitted.get_mut(),
|
||||
*self.opened_connections.get_mut(),
|
||||
)
|
||||
}
|
||||
fn move_metrics(&self) -> (u64, usize) {
|
||||
(
|
||||
self.transmitted.swap(0, Ordering::AcqRel),
|
||||
self.opened_connections.swap(0, Ordering::AcqRel),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
trait Clearable {
|
||||
/// extract the value that should be reported
|
||||
fn should_report(self: &Arc<Self>) -> Option<u64>;
|
||||
/// Determine whether the counter should be cleared from the global map.
|
||||
fn should_clear(self: &mut Arc<Self>) -> bool;
|
||||
}
|
||||
|
||||
impl<C: MetricCounterReporter> Clearable for C {
|
||||
fn should_report(self: &Arc<Self>) -> Option<u64> {
|
||||
// heuristic to see if the branch is still open
|
||||
// if a clone happens while we are observing, the heuristic will be incorrect.
|
||||
@@ -54,13 +142,12 @@ impl MetricCounter {
|
||||
// However, for the strong count to be 1 it must have occured that at one instant
|
||||
// all the endpoints were closed, so missing a report because the endpoints are closed is valid.
|
||||
let is_open = Arc::strong_count(self) > 1;
|
||||
let opened = self.opened_connections.swap(0, Ordering::AcqRel);
|
||||
|
||||
// update cached metrics eagerly, even if they can't get sent
|
||||
// (to avoid sending the same metrics twice)
|
||||
// see the relevant discussion on why to do so even if the status is not success:
|
||||
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
|
||||
let value = self.transmitted.swap(0, Ordering::AcqRel);
|
||||
let (value, opened) = self.move_metrics();
|
||||
|
||||
// Our only requirement is that we report in every interval if there was an open connection
|
||||
// if there were no opened connections since, then we don't need to report
|
||||
@@ -70,15 +157,12 @@ impl MetricCounter {
|
||||
Some(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine whether the counter should be cleared from the global map.
|
||||
fn should_clear(self: &mut Arc<Self>) -> bool {
|
||||
// we can't clear this entry if it's acquired elsewhere
|
||||
let Some(counter) = Arc::get_mut(self) else {
|
||||
return false;
|
||||
};
|
||||
let opened = *counter.opened_connections.get_mut();
|
||||
let value = *counter.transmitted.get_mut();
|
||||
let (opened, value) = counter.get_metrics();
|
||||
// clear if there's no data to report
|
||||
value == 0 && opened == 0
|
||||
}
|
||||
@@ -90,11 +174,26 @@ type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
|
||||
#[derive(Default)]
|
||||
pub struct Metrics {
|
||||
endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
|
||||
backup_endpoints: DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
/// Register a new byte metrics counter for this endpoint
|
||||
pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
|
||||
let backup = if let Some(entry) = self.backup_endpoints.get(&ids) {
|
||||
entry.clone()
|
||||
} else {
|
||||
self.backup_endpoints
|
||||
.entry(ids.clone())
|
||||
.or_insert_with(|| {
|
||||
Arc::new(MetricBackupCounter {
|
||||
transmitted: AtomicU64::new(0),
|
||||
opened_connections: AtomicUsize::new(0),
|
||||
})
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
let entry = if let Some(entry) = self.endpoints.get(&ids) {
|
||||
entry.clone()
|
||||
} else {
|
||||
@@ -104,12 +203,13 @@ impl Metrics {
|
||||
Arc::new(MetricCounter {
|
||||
transmitted: AtomicU64::new(0),
|
||||
opened_connections: AtomicUsize::new(0),
|
||||
backup: backup.clone(),
|
||||
})
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
entry.opened_connections.fetch_add(1, Ordering::AcqRel);
|
||||
entry.record_connection(1);
|
||||
entry
|
||||
}
|
||||
}
|
||||
@@ -132,7 +232,7 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
|
||||
|
||||
let now = Utc::now();
|
||||
collect_metrics_iteration(
|
||||
&USAGE_METRICS,
|
||||
&USAGE_METRICS.endpoints,
|
||||
&http_client,
|
||||
&config.endpoint,
|
||||
&hostname,
|
||||
@@ -144,24 +244,12 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn collect_metrics_iteration(
|
||||
metrics: &Metrics,
|
||||
client: &http::ClientWithMiddleware,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
hostname: &str,
|
||||
prev: DateTime<Utc>,
|
||||
now: DateTime<Utc>,
|
||||
) {
|
||||
info!(
|
||||
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
|
||||
metric_collection_endpoint
|
||||
);
|
||||
|
||||
fn collect_and_clear_metrics<C: Clearable>(
|
||||
endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
|
||||
) -> Vec<(Ids, u64)> {
|
||||
let mut metrics_to_clear = Vec::new();
|
||||
|
||||
let metrics_to_send: Vec<(Ids, u64)> = metrics
|
||||
.endpoints
|
||||
let metrics_to_send: Vec<(Ids, u64)> = endpoints
|
||||
.iter()
|
||||
.filter_map(|counter| {
|
||||
let key = counter.key().clone();
|
||||
@@ -173,33 +261,71 @@ async fn collect_metrics_iteration(
|
||||
})
|
||||
.collect();
|
||||
|
||||
for metric in metrics_to_clear {
|
||||
match endpoints.entry(metric) {
|
||||
Entry::Occupied(mut counter) => {
|
||||
if counter.get_mut().should_clear() {
|
||||
counter.remove_entry();
|
||||
}
|
||||
}
|
||||
Entry::Vacant(_) => {}
|
||||
}
|
||||
}
|
||||
metrics_to_send
|
||||
}
|
||||
|
||||
fn create_event_chunks<'a>(
|
||||
metrics_to_send: &'a [(Ids, u64)],
|
||||
hostname: &'a str,
|
||||
prev: DateTime<Utc>,
|
||||
now: DateTime<Utc>,
|
||||
chunk_size: usize,
|
||||
) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
|
||||
// Split into chunks of 1000 metrics to avoid exceeding the max request size
|
||||
metrics_to_send
|
||||
.chunks(chunk_size)
|
||||
.map(move |chunk| EventChunk {
|
||||
events: chunk
|
||||
.iter()
|
||||
.map(|(ids, value)| Event {
|
||||
kind: EventType::Incremental {
|
||||
start_time: prev,
|
||||
stop_time: now,
|
||||
},
|
||||
metric: PROXY_IO_BYTES_PER_CLIENT,
|
||||
idempotency_key: idempotency_key(hostname),
|
||||
value: *value,
|
||||
extra: ids.clone(),
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn collect_metrics_iteration(
|
||||
endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
|
||||
client: &http::ClientWithMiddleware,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
hostname: &str,
|
||||
prev: DateTime<Utc>,
|
||||
now: DateTime<Utc>,
|
||||
) {
|
||||
info!(
|
||||
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
|
||||
metric_collection_endpoint
|
||||
);
|
||||
|
||||
let metrics_to_send = collect_and_clear_metrics(endpoints);
|
||||
|
||||
if metrics_to_send.is_empty() {
|
||||
trace!("no new metrics to send");
|
||||
}
|
||||
|
||||
// Send metrics.
|
||||
// Split into chunks of 1000 metrics to avoid exceeding the max request size
|
||||
for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
|
||||
let events = chunk
|
||||
.iter()
|
||||
.map(|(ids, value)| Event {
|
||||
kind: EventType::Incremental {
|
||||
start_time: prev,
|
||||
stop_time: now,
|
||||
},
|
||||
metric: PROXY_IO_BYTES_PER_CLIENT,
|
||||
idempotency_key: idempotency_key(hostname),
|
||||
value: *value,
|
||||
extra: Ids {
|
||||
endpoint_id: ids.endpoint_id.clone(),
|
||||
branch_id: ids.branch_id.clone(),
|
||||
},
|
||||
})
|
||||
.collect();
|
||||
|
||||
for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, CHUNK_SIZE) {
|
||||
let res = client
|
||||
.post(metric_collection_endpoint.clone())
|
||||
.json(&EventChunk { events })
|
||||
.json(&chunk)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
@@ -213,23 +339,142 @@ async fn collect_metrics_iteration(
|
||||
|
||||
if !res.status().is_success() {
|
||||
error!("metrics endpoint refused the sent metrics: {:?}", res);
|
||||
for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
|
||||
for metric in chunk.events.iter().filter(|e| e.value > (1u64 << 40)) {
|
||||
// Report if the metric value is suspiciously large
|
||||
error!("potentially abnormal metric value: {:?}", metric);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for metric in metrics_to_clear {
|
||||
match metrics.endpoints.entry(metric) {
|
||||
Entry::Occupied(mut counter) => {
|
||||
if counter.get_mut().should_clear() {
|
||||
counter.remove_entry();
|
||||
}
|
||||
}
|
||||
Entry::Vacant(_) => {}
|
||||
pub async fn task_backup(
|
||||
backup_config: &MetricBackupCollectionConfig,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("metrics backup config: {backup_config:?}");
|
||||
scopeguard::defer! {
|
||||
info!("metrics backup has shut down");
|
||||
}
|
||||
// Even if the remote storage is not configured, we still want to clear the metrics.
|
||||
let storage = backup_config
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.map(|config| GenericRemoteStorage::from_config(config).context("remote storage init"))
|
||||
.transpose()?;
|
||||
let mut ticker = tokio::time::interval(backup_config.interval);
|
||||
let mut prev = Utc::now();
|
||||
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
|
||||
loop {
|
||||
select(pin!(ticker.tick()), pin!(cancellation_token.cancelled())).await;
|
||||
let now = Utc::now();
|
||||
collect_metrics_backup_iteration(
|
||||
&USAGE_METRICS.backup_endpoints,
|
||||
&storage,
|
||||
&hostname,
|
||||
prev,
|
||||
now,
|
||||
backup_config.chunk_size,
|
||||
)
|
||||
.await;
|
||||
|
||||
prev = now;
|
||||
if cancellation_token.is_cancelled() {
|
||||
info!("metrics backup has been cancelled");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn collect_metrics_backup_iteration(
|
||||
endpoints: &DashMap<Ids, Arc<MetricBackupCounter>, FastHasher>,
|
||||
storage: &Option<GenericRemoteStorage>,
|
||||
hostname: &str,
|
||||
prev: DateTime<Utc>,
|
||||
now: DateTime<Utc>,
|
||||
chunk_size: usize,
|
||||
) {
|
||||
let year = now.year();
|
||||
let month = now.month();
|
||||
let day = now.day();
|
||||
let hour = now.hour();
|
||||
let minute = now.minute();
|
||||
let second = now.second();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
info!("starting collect_metrics_backup_iteration");
|
||||
|
||||
let metrics_to_send = collect_and_clear_metrics(endpoints);
|
||||
|
||||
if metrics_to_send.is_empty() {
|
||||
trace!("no new metrics to send");
|
||||
}
|
||||
|
||||
// Send metrics.
|
||||
for chunk in create_event_chunks(&metrics_to_send, hostname, prev, now, chunk_size) {
|
||||
let real_now = Utc::now();
|
||||
let id = uuid::Uuid::new_v7(Timestamp::from_unix(
|
||||
NoContext,
|
||||
real_now.second().into(),
|
||||
real_now.nanosecond(),
|
||||
));
|
||||
let path = format!("year={year:04}/month={month:02}/day={day:02}/{hour:02}:{minute:02}:{second:02}Z_{id}.json.gz");
|
||||
let remote_path = match RemotePath::from_string(&path) {
|
||||
Ok(remote_path) => remote_path,
|
||||
Err(e) => {
|
||||
error!("failed to create remote path from str {path}: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let res = upload_events_chunk(storage, chunk, &remote_path, &cancel).await;
|
||||
|
||||
if let Err(e) = res {
|
||||
error!(
|
||||
"failed to upload consumption events to remote storage: {:?}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload_events_chunk(
|
||||
storage: &Option<GenericRemoteStorage>,
|
||||
chunk: EventChunk<'_, Event<Ids, &'static str>>,
|
||||
remote_path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let storage = match storage {
|
||||
Some(storage) => storage,
|
||||
None => {
|
||||
error!("no remote storage configured");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
|
||||
let mut encoder = GzipEncoder::new(Vec::new());
|
||||
encoder.write_all(&data).await.context("compress metrics")?;
|
||||
encoder.shutdown().await.context("compress metrics")?;
|
||||
let compressed_data: Bytes = encoder.get_ref().clone().into();
|
||||
backoff::retry(
|
||||
|| async {
|
||||
let stream = futures::stream::once(futures::future::ready(Ok(compressed_data.clone())));
|
||||
storage
|
||||
.upload(stream, data.len(), remote_path, None, cancel)
|
||||
.await
|
||||
},
|
||||
TimeoutOrCancel::caused_by_cancel,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_UPLOAD_MAX_RETRIES,
|
||||
"request_data_upload",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
|
||||
.and_then(|x| x)
|
||||
.context("request_data_upload")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -248,7 +493,7 @@ mod tests {
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
use super::{collect_metrics_iteration, Ids, Metrics};
|
||||
use super::*;
|
||||
use crate::{http, rate_limiter::RateLimiterConfig};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -284,18 +529,19 @@ mod tests {
|
||||
let now = Utc::now();
|
||||
|
||||
// no counters have been registered
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// register a new counter
|
||||
|
||||
let counter = metrics.register(Ids {
|
||||
endpoint_id: "e1".into(),
|
||||
branch_id: "b1".into(),
|
||||
});
|
||||
|
||||
// the counter should be observed despite 0 egress
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
@@ -305,7 +551,7 @@ mod tests {
|
||||
counter.record_egress(1);
|
||||
|
||||
// egress should be observered
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert_eq!(r.len(), 1);
|
||||
assert_eq!(r[0].events.len(), 1);
|
||||
@@ -315,11 +561,19 @@ mod tests {
|
||||
drop(counter);
|
||||
|
||||
// we do not observe the counter
|
||||
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
|
||||
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
|
||||
let r = std::mem::take(&mut *reports2.lock().unwrap());
|
||||
assert!(r.is_empty());
|
||||
|
||||
// counter is unregistered
|
||||
assert!(metrics.endpoints.is_empty());
|
||||
|
||||
collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
|
||||
.await;
|
||||
assert!(!metrics.backup_endpoints.is_empty());
|
||||
collect_metrics_backup_iteration(&metrics.backup_endpoints, &None, "foo", now, now, 1000)
|
||||
.await;
|
||||
// backup counter is unregistered after the second iteration
|
||||
assert!(metrics.backup_endpoints.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user