From 7fe5a689b4dd501a084181ccad03e0bbc3c0f6f2 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 14 Mar 2025 13:54:57 +0000 Subject: [PATCH] feat(proxy): export ingress metrics (#11244) ## Problem We exposed the direction tag in #10925 but didn't actually include the ingress tag in the export to allow for an adaption period. ## Summary of changes We now export the ingress direction --- proxy/src/proxy/passthrough.rs | 4 +- proxy/src/serverless/conn_pool_lib.rs | 9 +- proxy/src/serverless/http_conn_pool.rs | 9 +- proxy/src/serverless/sql_over_http.rs | 17 ++- proxy/src/usage_metrics.rs | 148 ++++++++++++++++++------- 5 files changed, 126 insertions(+), 61 deletions(-) diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index 23b9897155..c100b8d716 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -10,7 +10,7 @@ use crate::config::ComputeConfig; use crate::control_plane::messages::MetricsAuxInfo; use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard}; use crate::stream::Stream; -use crate::usage_metrics::{Ids, MetricCounterRecorder, TrafficDirection, USAGE_METRICS}; +use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS}; /// Forward bytes in both directions (client <-> compute). #[tracing::instrument(skip_all)] @@ -24,7 +24,6 @@ pub(crate) async fn proxy_pass( let usage_tx = USAGE_METRICS.register(Ids { endpoint_id: aux.endpoint_id, branch_id: aux.branch_id, - direction: TrafficDirection::Egress, private_link_id, }); @@ -47,6 +46,7 @@ pub(crate) async fn proxy_pass( |cnt| { // Number of bytes the client sent to the compute node (inbound). metrics.get_metric(m_recv).inc_by(cnt as u64); + usage_tx.record_ingress(cnt as u64); }, ); diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 933204994b..77b548cc43 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -22,7 +22,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::protocol2::ConnectionInfoExtra; use crate::types::{DbName, EndpointCacheKey, RoleName}; -use crate::usage_metrics::{Ids, MetricCounter, TrafficDirection, USAGE_METRICS}; +use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; #[derive(Debug, Clone)] pub(crate) struct ConnInfo { @@ -639,11 +639,7 @@ impl Client { (&mut inner.inner, Discard { conn_info, pool }) } - pub(crate) fn metrics( - &self, - direction: TrafficDirection, - ctx: &RequestContext, - ) -> Arc { + pub(crate) fn metrics(&self, ctx: &RequestContext) -> Arc { let aux = &self .inner .as_ref() @@ -659,7 +655,6 @@ impl Client { USAGE_METRICS.register(Ids { endpoint_id: aux.endpoint_id, branch_id: aux.branch_id, - direction, private_link_id, }) } diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index bca2d4c165..1c6574e57e 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -19,7 +19,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::protocol2::ConnectionInfoExtra; use crate::types::EndpointCacheKey; -use crate::usage_metrics::{Ids, MetricCounter, TrafficDirection, USAGE_METRICS}; +use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; pub(crate) type Send = http2::SendRequest; pub(crate) type Connect = http2::Connection, hyper::body::Incoming, TokioExecutor>; @@ -265,11 +265,7 @@ impl Client { Self { inner } } - pub(crate) fn metrics( - &self, - direction: TrafficDirection, - ctx: &RequestContext, - ) -> Arc { + pub(crate) fn metrics(&self, ctx: &RequestContext) -> Arc { let aux = &self.inner.aux; let private_link_id = match ctx.extra() { @@ -281,7 +277,6 @@ impl Client { USAGE_METRICS.register(Ids { endpoint_id: aux.endpoint_id, branch_id: aux.branch_id, - direction, private_link_id, }) } diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index a79a478126..10e378a18d 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -42,7 +42,7 @@ use crate::metrics::{HttpDirection, Metrics}; use crate::proxy::{NeonOptions, run_until_cancelled}; use crate::serverless::backend::HttpConnError; use crate::types::{DbName, RoleName}; -use crate::usage_metrics::{MetricCounter, MetricCounterRecorder, TrafficDirection}; +use crate::usage_metrics::{MetricCounter, MetricCounterRecorder}; #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -663,6 +663,7 @@ async fn handle_db_inner( let parsed_headers = HttpHeaders::try_parse(headers)?; + let mut request_len = 0; let fetch_and_process_request = Box::pin( async { let body = read_body_with_limit( @@ -671,6 +672,8 @@ async fn handle_db_inner( ) .await?; + request_len = body.len(); + Metrics::get() .proxy .http_conn_content_length_bytes @@ -765,7 +768,7 @@ async fn handle_db_inner( } }; - let metrics = client.metrics(TrafficDirection::Egress, ctx); + let metrics = client.metrics(ctx); let len = json_output.len(); let response = response @@ -781,6 +784,8 @@ async fn handle_db_inner( // count the egress bytes - we miss the TLS and header overhead but oh well... // moving this later in the stack is going to be a lot of effort and ehhhh metrics.record_egress(len as u64); + metrics.record_ingress(request_len as u64); + Metrics::get() .proxy .http_conn_content_length_bytes @@ -838,7 +843,7 @@ async fn handle_auth_broker_inner( .expect("all headers and params received via hyper should be valid for request"); // todo: map body to count egress - let _metrics = client.metrics(TrafficDirection::Egress, ctx); + let _metrics = client.metrics(ctx); Ok(client .inner @@ -1168,10 +1173,10 @@ enum Discard<'a> { } impl Client { - fn metrics(&self, direction: TrafficDirection, ctx: &RequestContext) -> Arc { + fn metrics(&self, ctx: &RequestContext) -> Arc { match self { - Client::Remote(client) => client.metrics(direction, ctx), - Client::Local(local_client) => local_client.metrics(direction, ctx), + Client::Remote(client) => client.metrics(ctx), + Client::Local(local_client) => local_client.metrics(ctx), } } diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 004d268fa1..2b27dc5c76 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -44,11 +44,17 @@ const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60); pub(crate) struct Ids { pub(crate) endpoint_id: EndpointIdInt, pub(crate) branch_id: BranchIdInt, - pub(crate) direction: TrafficDirection, #[serde(with = "none_as_empty_string")] pub(crate) private_link_id: Option, } +#[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)] +struct Extra { + #[serde(flatten)] + ids: Ids, + direction: TrafficDirection, +} + mod none_as_empty_string { use serde::Deserialize; use smol_str::SmolStr; @@ -76,18 +82,23 @@ pub(crate) enum TrafficDirection { pub(crate) trait MetricCounterRecorder { /// Record that some bytes were sent from the proxy to the client fn record_egress(&self, bytes: u64); + + /// Record that some bytes were sent from the client to the proxy + fn record_ingress(&self, bytes: u64); + /// Record that some connections were opened fn record_connection(&self, count: usize); } trait MetricCounterReporter { - fn get_metrics(&mut self) -> (u64, usize); - fn move_metrics(&self) -> (u64, usize); + fn get_metrics(&mut self) -> MetricsData; + fn move_metrics(&self) -> MetricsData; } #[derive(Debug)] pub(crate) struct MetricCounter { transmitted: AtomicU64, + received: AtomicU64, opened_connections: AtomicUsize, } @@ -97,6 +108,11 @@ impl MetricCounterRecorder for MetricCounter { self.transmitted.fetch_add(bytes, Ordering::Relaxed); } + /// Record that some bytes were sent from the proxy to the client + fn record_ingress(&self, bytes: u64) { + self.received.fetch_add(bytes, Ordering::Relaxed); + } + /// Record that some connections were opened fn record_connection(&self, count: usize) { self.opened_connections.fetch_add(count, Ordering::Relaxed); @@ -104,29 +120,43 @@ impl MetricCounterRecorder for MetricCounter { } impl MetricCounterReporter for MetricCounter { - fn get_metrics(&mut self) -> (u64, usize) { - ( - *self.transmitted.get_mut(), - *self.opened_connections.get_mut(), - ) + fn get_metrics(&mut self) -> MetricsData { + MetricsData { + received: *self.received.get_mut(), + transmitted: *self.transmitted.get_mut(), + connections: *self.opened_connections.get_mut(), + } } - fn move_metrics(&self) -> (u64, usize) { - ( - self.transmitted.swap(0, Ordering::Relaxed), - self.opened_connections.swap(0, Ordering::Relaxed), - ) + + fn move_metrics(&self) -> MetricsData { + MetricsData { + received: self.received.swap(0, Ordering::Relaxed), + transmitted: self.transmitted.swap(0, Ordering::Relaxed), + connections: self.opened_connections.swap(0, Ordering::Relaxed), + } } } +struct MetricsData { + transmitted: u64, + received: u64, + connections: usize, +} + +struct BytesSent { + transmitted: u64, + received: u64, +} + trait Clearable { /// extract the value that should be reported - fn should_report(self: &Arc) -> Option; + fn should_report(self: &Arc) -> Option; /// Determine whether the counter should be cleared from the global map. fn should_clear(self: &mut Arc) -> bool; } impl Clearable for C { - fn should_report(self: &Arc) -> Option { + fn should_report(self: &Arc) -> Option { // heuristic to see if the branch is still open // if a clone happens while we are observing, the heuristic will be incorrect. // @@ -139,14 +169,21 @@ impl Clearable for C { // (to avoid sending the same metrics twice) // see the relevant discussion on why to do so even if the status is not success: // https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956 - let (value, opened) = self.move_metrics(); + let MetricsData { + transmitted, + received, + connections, + } = self.move_metrics(); // Our only requirement is that we report in every interval if there was an open connection // if there were no opened connections since, then we don't need to report - if value == 0 && !is_open && opened == 0 { + if transmitted == 0 && received == 0 && !is_open && connections == 0 { None } else { - Some(value) + Some(BytesSent { + transmitted, + received, + }) } } fn should_clear(self: &mut Arc) -> bool { @@ -154,9 +191,13 @@ impl Clearable for C { let Some(counter) = Arc::get_mut(self) else { return false; }; - let (opened, value) = counter.get_metrics(); + let MetricsData { + transmitted, + received, + connections, + } = counter.get_metrics(); // clear if there's no data to report - value == 0 && opened == 0 + transmitted == 0 && received == 0 && connections == 0 } } @@ -178,6 +219,7 @@ impl Metrics { .entry(ids) .or_insert_with(|| { Arc::new(MetricCounter { + received: AtomicU64::new(0), transmitted: AtomicU64::new(0), opened_connections: AtomicUsize::new(0), }) @@ -242,10 +284,10 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result( endpoints: &ClashMap, FastHasher>, -) -> Vec<(Ids, u64)> { +) -> Vec<(Ids, BytesSent)> { let mut metrics_to_clear = Vec::new(); - let metrics_to_send: Vec<(Ids, u64)> = endpoints + let metrics_to_send: Vec<(Ids, BytesSent)> = endpoints .iter() .filter_map(|counter| { let key = counter.key().clone(); @@ -271,26 +313,46 @@ fn collect_and_clear_metrics( } fn create_event_chunks<'a>( - metrics_to_send: &'a [(Ids, u64)], + metrics_to_send: &'a [(Ids, BytesSent)], hostname: &'a str, prev: DateTime, now: DateTime, chunk_size: usize, -) -> impl Iterator>> + 'a { +) -> impl Iterator>> + 'a { metrics_to_send .chunks(chunk_size) .map(move |chunk| EventChunk { events: chunk .iter() - .map(|(ids, value)| Event { - kind: EventType::Incremental { - start_time: prev, - stop_time: now, - }, - metric: PROXY_IO_BYTES_PER_CLIENT, - idempotency_key: idempotency_key(hostname), - value: *value, - extra: ids.clone(), + .flat_map(|(ids, bytes)| { + [ + Event { + kind: EventType::Incremental { + start_time: prev, + stop_time: now, + }, + metric: PROXY_IO_BYTES_PER_CLIENT, + idempotency_key: idempotency_key(hostname), + value: bytes.transmitted, + extra: Extra { + ids: ids.clone(), + direction: TrafficDirection::Egress, + }, + }, + Event { + kind: EventType::Incremental { + start_time: prev, + stop_time: now, + }, + metric: PROXY_IO_BYTES_PER_CLIENT, + idempotency_key: idempotency_key(hostname), + value: bytes.received, + extra: Extra { + ids: ids.clone(), + direction: TrafficDirection::Ingress, + }, + }, + ] }) .collect(), }) @@ -350,7 +412,7 @@ fn create_remote_path_prefix(now: DateTime) -> String { async fn upload_main_events_chunked( client: &http::ClientWithMiddleware, metric_collection_endpoint: &reqwest::Url, - chunk: &EventChunk<'_, Event>, + chunk: &EventChunk<'_, Event>, subchunk_size: usize, ) { // Split into smaller chunks to avoid exceeding the max request size @@ -384,7 +446,7 @@ async fn upload_main_events_chunked( async fn upload_backup_events( storage: Option<&GenericRemoteStorage>, - chunk: &EventChunk<'_, Event>, + chunk: &EventChunk<'_, Event>, path_prefix: &str, cancel: &CancellationToken, ) -> anyhow::Result<()> { @@ -461,7 +523,7 @@ mod tests { #[tokio::test] async fn metrics() { - type Report = EventChunk<'static, Event>; + type Report = EventChunk<'static, Event>; let reports: Arc>> = Arc::default(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -533,7 +595,6 @@ mod tests { let counter = metrics.register(Ids { endpoint_id: (&EndpointId::from("e1")).into(), branch_id: (&BranchId::from("b1")).into(), - direction: TrafficDirection::Egress, private_link_id: None, }); @@ -551,13 +612,19 @@ mod tests { .await; let r = std::mem::take(&mut *reports.lock().unwrap()); assert_eq!(r.len(), 1); - assert_eq!(r[0].events.len(), 1); + assert_eq!(r[0].events.len(), 2); assert_eq!(r[0].events[0].value, 0); + assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress); + assert_eq!(r[0].events[1].value, 0); + assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress); pushed_chunks.extend(r); // record egress counter.record_egress(1); + // record ingress + counter.record_ingress(2); + // egress should be observered collect_metrics_iteration( &metrics.endpoints, @@ -572,8 +639,11 @@ mod tests { .await; let r = std::mem::take(&mut *reports.lock().unwrap()); assert_eq!(r.len(), 1); - assert_eq!(r[0].events.len(), 1); + assert_eq!(r[0].events.len(), 2); assert_eq!(r[0].events[0].value, 1); + assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress); + assert_eq!(r[0].events[1].value, 2); + assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress); pushed_chunks.extend(r); // release counter