From 25d2f4b669076a266775ea4e73c578eee2d63049 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 21 Jul 2023 18:10:55 +0300 Subject: [PATCH] metrics: chunked responses (#4768) Metrics can get really large in the order of hundreds of megabytes, which we used to buffer completly (after a few rounds of growing the buffer). --- Cargo.lock | 1 + libs/metrics/src/lib.rs | 1 + libs/utils/Cargo.toml | 4 + libs/utils/src/http/endpoint.rs | 137 +++++++++++++++++++++++++++++--- 4 files changed, 131 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c862241a4..b9ca53064f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,6 +4867,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-error", "tracing-subscriber", diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index f24488b19d..b4026d93e1 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -6,6 +6,7 @@ use once_cell::sync::Lazy; use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec}; pub use prometheus::opts; pub use prometheus::register; +pub use prometheus::Error; pub use prometheus::{core, default_registry, proto}; pub use prometheus::{exponential_buckets, linear_buckets}; pub use prometheus::{register_counter_vec, Counter, CounterVec}; diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index e7c8323c1d..f01131540e 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -42,6 +42,10 @@ workspace_hack.workspace = true const_format.workspace = true +# to use tokio channels as streams, this is faster to compile than async_stream +# why is it only here? no other crate should use it, streams are rarely needed. +tokio-stream = { version = "0.1.14" } + [dev-dependencies] byteorder.workspace = true bytes.workspace = true diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 33241dbdf7..f3f5e95d0b 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -9,7 +9,6 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use once_cell::sync::Lazy; use routerify::ext::RequestExt; use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; -use tokio::task::JoinError; use tracing::{self, debug, info, info_span, warn, Instrument}; use std::future::Future; @@ -148,26 +147,140 @@ impl Drop for RequestCancelled { } async fn prometheus_metrics_handler(_req: Request) -> Result, ApiError> { + use bytes::{Bytes, BytesMut}; + use std::io::Write as _; + use tokio::sync::mpsc; + use tokio_stream::wrappers::ReceiverStream; + SERVE_METRICS_COUNT.inc(); - let mut buffer = vec![]; - let encoder = TextEncoder::new(); + /// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks. + struct ChannelWriter { + buffer: BytesMut, + tx: mpsc::Sender>, + written: usize, + } - let metrics = tokio::task::spawn_blocking(move || { - // Currently we take a lot of mutexes while collecting metrics, so it's - // better to spawn a blocking task to avoid blocking the event loop. - metrics::gather() - }) - .await - .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?; - encoder.encode(&metrics, &mut buffer).unwrap(); + impl ChannelWriter { + fn new(buf_len: usize, tx: mpsc::Sender>) -> Self { + assert_ne!(buf_len, 0); + ChannelWriter { + // split about half off the buffer from the start, because we flush depending on + // capacity. first flush will come sooner than without this, but now resizes will + // have better chance of picking up the "other" half. not guaranteed of course. + buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2), + tx, + written: 0, + } + } + + fn flush0(&mut self) -> std::io::Result { + let n = self.buffer.len(); + if n == 0 { + return Ok(0); + } + + tracing::trace!(n, "flushing"); + let ready = self.buffer.split().freeze(); + + // not ideal to call from blocking code to block_on, but we are sure that this + // operation does not spawn_blocking other tasks + let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async { + self.tx.send(Ok(ready)).await.map_err(|_| ())?; + + // throttle sending to allow reuse of our buffer in `write`. + self.tx.reserve().await.map_err(|_| ())?; + + // now the response task has picked up the buffer and hopefully started + // sending it to the client. + Ok(()) + }); + if res.is_err() { + return Err(std::io::ErrorKind::BrokenPipe.into()); + } + self.written += n; + Ok(n) + } + + fn flushed_bytes(&self) -> usize { + self.written + } + } + + impl std::io::Write for ChannelWriter { + fn write(&mut self, mut buf: &[u8]) -> std::io::Result { + let remaining = self.buffer.capacity() - self.buffer.len(); + + let out_of_space = remaining < buf.len(); + + let original_len = buf.len(); + + if out_of_space { + let can_still_fit = buf.len() - remaining; + self.buffer.extend_from_slice(&buf[..can_still_fit]); + buf = &buf[can_still_fit..]; + self.flush0()?; + } + + // assume that this will often under normal operation just move the pointer back to the + // beginning of allocation, because previous split off parts are already sent and + // dropped. + self.buffer.extend_from_slice(buf); + Ok(original_len) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.flush0().map(|_| ()) + } + } + + let started_at = std::time::Instant::now(); + + let (tx, rx) = mpsc::channel(1); + + let body = Body::wrap_stream(ReceiverStream::new(rx)); + + let mut writer = ChannelWriter::new(128 * 1024, tx); + + let encoder = TextEncoder::new(); let response = Response::builder() .status(200) .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) + .body(body) .unwrap(); + let span = info_span!("blocking"); + tokio::task::spawn_blocking(move || { + let _span = span.entered(); + let metrics = metrics::gather(); + let res = encoder + .encode(&metrics, &mut writer) + .and_then(|_| writer.flush().map_err(|e| e.into())); + + match res { + Ok(()) => { + tracing::info!( + bytes = writer.flushed_bytes(), + elapsed_ms = started_at.elapsed().as_millis(), + "responded /metrics" + ); + } + Err(e) => { + tracing::warn!("failed to write out /metrics response: {e:#}"); + // semantics of this error are quite... unclear. we want to error the stream out to + // abort the response to somehow notify the client that we failed. + // + // though, most likely the reason for failure is that the receiver is already gone. + drop( + writer + .tx + .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())), + ); + } + } + }); + Ok(response) }