diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index ea1d086bc5..c5eee1a5b5 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -16,7 +16,7 @@ use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS}; #[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn proxy_pass( client: impl AsyncRead + AsyncWrite + Unpin, - compute: impl AsyncRead + AsyncWrite + Unpin, + mut compute: impl AsyncRead + AsyncWrite + Unpin, aux: MetricsAuxInfo, private_link_id: Option, ) -> Result<(), ErrorSource> { @@ -29,24 +29,16 @@ pub(crate) async fn proxy_pass( let metrics = &Metrics::get().proxy.io_bytes; let m_sent = metrics.with_labels(Direction::Tx); + let m_recv = metrics.with_labels(Direction::Rx); let mut client = MeasuredStream::new( client, - |_| {}, - |cnt| { - // Number of bytes we sent to the client (outbound). - metrics.get_metric(m_sent).inc_by(cnt as u64); - usage_tx.record_egress(cnt as u64); + |bytes_read| { + metrics.get_metric(m_recv).inc_by(bytes_read as u64); + usage_tx.record_ingress(bytes_read as u64); }, - ); - - let m_recv = metrics.with_labels(Direction::Rx); - let mut compute = MeasuredStream::new( - compute, - |_| {}, - |cnt| { - // Number of bytes the client sent to the compute node (inbound). - metrics.get_metric(m_recv).inc_by(cnt as u64); - usage_tx.record_ingress(cnt as u64); + |bytes_flushed| { + metrics.get_metric(m_sent).inc_by(bytes_flushed as u64); + usage_tx.record_egress(bytes_flushed as u64); }, );