feat(proxy): export ingress metrics (#11244)

## Problem

We exposed the direction tag in #10925 but didn't actually include the
ingress tag in the export to allow for an adaption period.

## Summary of changes

We now export the ingress direction
This commit is contained in:
Conrad Ludgate
2025-03-14 13:54:57 +00:00
committed by GitHub
parent b0922967e0
commit 7fe5a689b4
5 changed files with 126 additions and 61 deletions

View File

@@ -10,7 +10,7 @@ use crate::config::ComputeConfig;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard};
use crate::stream::Stream;
use crate::usage_metrics::{Ids, MetricCounterRecorder, TrafficDirection, USAGE_METRICS};
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(skip_all)]
@@ -24,7 +24,6 @@ pub(crate) async fn proxy_pass(
let usage_tx = USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
direction: TrafficDirection::Egress,
private_link_id,
});
@@ -47,6 +46,7 @@ pub(crate) async fn proxy_pass(
|cnt| {
// Number of bytes the client sent to the compute node (inbound).
metrics.get_metric(m_recv).inc_by(cnt as u64);
usage_tx.record_ingress(cnt as u64);
},
);

View File

@@ -22,7 +22,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::protocol2::ConnectionInfoExtra;
use crate::types::{DbName, EndpointCacheKey, RoleName};
use crate::usage_metrics::{Ids, MetricCounter, TrafficDirection, USAGE_METRICS};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
#[derive(Debug, Clone)]
pub(crate) struct ConnInfo {
@@ -639,11 +639,7 @@ impl<C: ClientInnerExt> Client<C> {
(&mut inner.inner, Discard { conn_info, pool })
}
pub(crate) fn metrics(
&self,
direction: TrafficDirection,
ctx: &RequestContext,
) -> Arc<MetricCounter> {
pub(crate) fn metrics(&self, ctx: &RequestContext) -> Arc<MetricCounter> {
let aux = &self
.inner
.as_ref()
@@ -659,7 +655,6 @@ impl<C: ClientInnerExt> Client<C> {
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
direction,
private_link_id,
})
}

View File

@@ -19,7 +19,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::protocol2::ConnectionInfoExtra;
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, TrafficDirection, USAGE_METRICS};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect = http2::Connection<TokioIo<AsyncRW>, hyper::body::Incoming, TokioExecutor>;
@@ -265,11 +265,7 @@ impl<C: ClientInnerExt + Clone> Client<C> {
Self { inner }
}
pub(crate) fn metrics(
&self,
direction: TrafficDirection,
ctx: &RequestContext,
) -> Arc<MetricCounter> {
pub(crate) fn metrics(&self, ctx: &RequestContext) -> Arc<MetricCounter> {
let aux = &self.inner.aux;
let private_link_id = match ctx.extra() {
@@ -281,7 +277,6 @@ impl<C: ClientInnerExt + Clone> Client<C> {
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
direction,
private_link_id,
})
}

View File

@@ -42,7 +42,7 @@ use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{NeonOptions, run_until_cancelled};
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder, TrafficDirection};
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -663,6 +663,7 @@ async fn handle_db_inner(
let parsed_headers = HttpHeaders::try_parse(headers)?;
let mut request_len = 0;
let fetch_and_process_request = Box::pin(
async {
let body = read_body_with_limit(
@@ -671,6 +672,8 @@ async fn handle_db_inner(
)
.await?;
request_len = body.len();
Metrics::get()
.proxy
.http_conn_content_length_bytes
@@ -765,7 +768,7 @@ async fn handle_db_inner(
}
};
let metrics = client.metrics(TrafficDirection::Egress, ctx);
let metrics = client.metrics(ctx);
let len = json_output.len();
let response = response
@@ -781,6 +784,8 @@ async fn handle_db_inner(
// count the egress bytes - we miss the TLS and header overhead but oh well...
// moving this later in the stack is going to be a lot of effort and ehhhh
metrics.record_egress(len as u64);
metrics.record_ingress(request_len as u64);
Metrics::get()
.proxy
.http_conn_content_length_bytes
@@ -838,7 +843,7 @@ async fn handle_auth_broker_inner(
.expect("all headers and params received via hyper should be valid for request");
// todo: map body to count egress
let _metrics = client.metrics(TrafficDirection::Egress, ctx);
let _metrics = client.metrics(ctx);
Ok(client
.inner
@@ -1168,10 +1173,10 @@ enum Discard<'a> {
}
impl Client {
fn metrics(&self, direction: TrafficDirection, ctx: &RequestContext) -> Arc<MetricCounter> {
fn metrics(&self, ctx: &RequestContext) -> Arc<MetricCounter> {
match self {
Client::Remote(client) => client.metrics(direction, ctx),
Client::Local(local_client) => local_client.metrics(direction, ctx),
Client::Remote(client) => client.metrics(ctx),
Client::Local(local_client) => local_client.metrics(ctx),
}
}

View File

@@ -44,11 +44,17 @@ const HTTP_REPORTING_RETRY_DURATION: Duration = Duration::from_secs(60);
pub(crate) struct Ids {
pub(crate) endpoint_id: EndpointIdInt,
pub(crate) branch_id: BranchIdInt,
pub(crate) direction: TrafficDirection,
#[serde(with = "none_as_empty_string")]
pub(crate) private_link_id: Option<SmolStr>,
}
#[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
struct Extra {
#[serde(flatten)]
ids: Ids,
direction: TrafficDirection,
}
mod none_as_empty_string {
use serde::Deserialize;
use smol_str::SmolStr;
@@ -76,18 +82,23 @@ pub(crate) enum TrafficDirection {
pub(crate) trait MetricCounterRecorder {
/// Record that some bytes were sent from the proxy to the client
fn record_egress(&self, bytes: u64);
/// Record that some bytes were sent from the client to the proxy
fn record_ingress(&self, bytes: u64);
/// Record that some connections were opened
fn record_connection(&self, count: usize);
}
trait MetricCounterReporter {
fn get_metrics(&mut self) -> (u64, usize);
fn move_metrics(&self) -> (u64, usize);
fn get_metrics(&mut self) -> MetricsData;
fn move_metrics(&self) -> MetricsData;
}
#[derive(Debug)]
pub(crate) struct MetricCounter {
transmitted: AtomicU64,
received: AtomicU64,
opened_connections: AtomicUsize,
}
@@ -97,6 +108,11 @@ impl MetricCounterRecorder for MetricCounter {
self.transmitted.fetch_add(bytes, Ordering::Relaxed);
}
/// Record that some bytes were sent from the proxy to the client
fn record_ingress(&self, bytes: u64) {
self.received.fetch_add(bytes, Ordering::Relaxed);
}
/// Record that some connections were opened
fn record_connection(&self, count: usize) {
self.opened_connections.fetch_add(count, Ordering::Relaxed);
@@ -104,29 +120,43 @@ impl MetricCounterRecorder for MetricCounter {
}
impl MetricCounterReporter for MetricCounter {
fn get_metrics(&mut self) -> (u64, usize) {
(
*self.transmitted.get_mut(),
*self.opened_connections.get_mut(),
)
fn get_metrics(&mut self) -> MetricsData {
MetricsData {
received: *self.received.get_mut(),
transmitted: *self.transmitted.get_mut(),
connections: *self.opened_connections.get_mut(),
}
}
fn move_metrics(&self) -> (u64, usize) {
(
self.transmitted.swap(0, Ordering::Relaxed),
self.opened_connections.swap(0, Ordering::Relaxed),
)
fn move_metrics(&self) -> MetricsData {
MetricsData {
received: self.received.swap(0, Ordering::Relaxed),
transmitted: self.transmitted.swap(0, Ordering::Relaxed),
connections: self.opened_connections.swap(0, Ordering::Relaxed),
}
}
}
struct MetricsData {
transmitted: u64,
received: u64,
connections: usize,
}
struct BytesSent {
transmitted: u64,
received: u64,
}
trait Clearable {
/// extract the value that should be reported
fn should_report(self: &Arc<Self>) -> Option<u64>;
fn should_report(self: &Arc<Self>) -> Option<BytesSent>;
/// Determine whether the counter should be cleared from the global map.
fn should_clear(self: &mut Arc<Self>) -> bool;
}
impl<C: MetricCounterReporter> Clearable for C {
fn should_report(self: &Arc<Self>) -> Option<u64> {
fn should_report(self: &Arc<Self>) -> Option<BytesSent> {
// heuristic to see if the branch is still open
// if a clone happens while we are observing, the heuristic will be incorrect.
//
@@ -139,14 +169,21 @@ impl<C: MetricCounterReporter> Clearable for C {
// (to avoid sending the same metrics twice)
// see the relevant discussion on why to do so even if the status is not success:
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
let (value, opened) = self.move_metrics();
let MetricsData {
transmitted,
received,
connections,
} = self.move_metrics();
// Our only requirement is that we report in every interval if there was an open connection
// if there were no opened connections since, then we don't need to report
if value == 0 && !is_open && opened == 0 {
if transmitted == 0 && received == 0 && !is_open && connections == 0 {
None
} else {
Some(value)
Some(BytesSent {
transmitted,
received,
})
}
}
fn should_clear(self: &mut Arc<Self>) -> bool {
@@ -154,9 +191,13 @@ impl<C: MetricCounterReporter> Clearable for C {
let Some(counter) = Arc::get_mut(self) else {
return false;
};
let (opened, value) = counter.get_metrics();
let MetricsData {
transmitted,
received,
connections,
} = counter.get_metrics();
// clear if there's no data to report
value == 0 && opened == 0
transmitted == 0 && received == 0 && connections == 0
}
}
@@ -178,6 +219,7 @@ impl Metrics {
.entry(ids)
.or_insert_with(|| {
Arc::new(MetricCounter {
received: AtomicU64::new(0),
transmitted: AtomicU64::new(0),
opened_connections: AtomicUsize::new(0),
})
@@ -242,10 +284,10 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
fn collect_and_clear_metrics<C: Clearable>(
endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
) -> Vec<(Ids, u64)> {
) -> Vec<(Ids, BytesSent)> {
let mut metrics_to_clear = Vec::new();
let metrics_to_send: Vec<(Ids, u64)> = endpoints
let metrics_to_send: Vec<(Ids, BytesSent)> = endpoints
.iter()
.filter_map(|counter| {
let key = counter.key().clone();
@@ -271,26 +313,46 @@ fn collect_and_clear_metrics<C: Clearable>(
}
fn create_event_chunks<'a>(
metrics_to_send: &'a [(Ids, u64)],
metrics_to_send: &'a [(Ids, BytesSent)],
hostname: &'a str,
prev: DateTime<Utc>,
now: DateTime<Utc>,
chunk_size: usize,
) -> impl Iterator<Item = EventChunk<'a, Event<Ids, &'static str>>> + 'a {
) -> impl Iterator<Item = EventChunk<'a, Event<Extra, &'static str>>> + 'a {
metrics_to_send
.chunks(chunk_size)
.map(move |chunk| EventChunk {
events: chunk
.iter()
.map(|(ids, value)| Event {
kind: EventType::Incremental {
start_time: prev,
stop_time: now,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value: *value,
extra: ids.clone(),
.flat_map(|(ids, bytes)| {
[
Event {
kind: EventType::Incremental {
start_time: prev,
stop_time: now,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value: bytes.transmitted,
extra: Extra {
ids: ids.clone(),
direction: TrafficDirection::Egress,
},
},
Event {
kind: EventType::Incremental {
start_time: prev,
stop_time: now,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value: bytes.received,
extra: Extra {
ids: ids.clone(),
direction: TrafficDirection::Ingress,
},
},
]
})
.collect(),
})
@@ -350,7 +412,7 @@ fn create_remote_path_prefix(now: DateTime<Utc>) -> String {
async fn upload_main_events_chunked(
client: &http::ClientWithMiddleware,
metric_collection_endpoint: &reqwest::Url,
chunk: &EventChunk<'_, Event<Ids, &str>>,
chunk: &EventChunk<'_, Event<Extra, &str>>,
subchunk_size: usize,
) {
// Split into smaller chunks to avoid exceeding the max request size
@@ -384,7 +446,7 @@ async fn upload_main_events_chunked(
async fn upload_backup_events(
storage: Option<&GenericRemoteStorage>,
chunk: &EventChunk<'_, Event<Ids, &'static str>>,
chunk: &EventChunk<'_, Event<Extra, &'static str>>,
path_prefix: &str,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
@@ -461,7 +523,7 @@ mod tests {
#[tokio::test]
async fn metrics() {
type Report = EventChunk<'static, Event<Ids, String>>;
type Report = EventChunk<'static, Event<Extra, String>>;
let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
@@ -533,7 +595,6 @@ mod tests {
let counter = metrics.register(Ids {
endpoint_id: (&EndpointId::from("e1")).into(),
branch_id: (&BranchId::from("b1")).into(),
direction: TrafficDirection::Egress,
private_link_id: None,
});
@@ -551,13 +612,19 @@ mod tests {
.await;
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events.len(), 2);
assert_eq!(r[0].events[0].value, 0);
assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress);
assert_eq!(r[0].events[1].value, 0);
assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress);
pushed_chunks.extend(r);
// record egress
counter.record_egress(1);
// record ingress
counter.record_ingress(2);
// egress should be observered
collect_metrics_iteration(
&metrics.endpoints,
@@ -572,8 +639,11 @@ mod tests {
.await;
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events.len(), 2);
assert_eq!(r[0].events[0].value, 1);
assert_eq!(r[0].events[0].extra.direction, TrafficDirection::Egress);
assert_eq!(r[0].events[1].value, 2);
assert_eq!(r[0].events[1].extra.direction, TrafficDirection::Ingress);
pushed_chunks.extend(r);
// release counter