diff --git a/Cargo.lock b/Cargo.lock
index 3c862241a4..b9ca53064f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4867,6 +4867,7 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
+ "tokio-stream",
"tracing",
"tracing-error",
"tracing-subscriber",
diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs
index f24488b19d..b4026d93e1 100644
--- a/libs/metrics/src/lib.rs
+++ b/libs/metrics/src/lib.rs
@@ -6,6 +6,7 @@ use once_cell::sync::Lazy;
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
+pub use prometheus::Error;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};
diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml
index e7c8323c1d..f01131540e 100644
--- a/libs/utils/Cargo.toml
+++ b/libs/utils/Cargo.toml
@@ -42,6 +42,10 @@ workspace_hack.workspace = true
const_format.workspace = true
+# to use tokio channels as streams, this is faster to compile than async_stream
+# why is it only here? no other crate should use it, streams are rarely needed.
+tokio-stream = { version = "0.1.14" }
+
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true
diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs
index 33241dbdf7..f3f5e95d0b 100644
--- a/libs/utils/src/http/endpoint.rs
+++ b/libs/utils/src/http/endpoint.rs
@@ -9,7 +9,6 @@ use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
-use tokio::task::JoinError;
use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future;
@@ -148,26 +147,140 @@ impl Drop for RequestCancelled {
}
async fn prometheus_metrics_handler(_req: Request
) -> Result, ApiError> {
+ use bytes::{Bytes, BytesMut};
+ use std::io::Write as _;
+ use tokio::sync::mpsc;
+ use tokio_stream::wrappers::ReceiverStream;
+
SERVE_METRICS_COUNT.inc();
- let mut buffer = vec![];
- let encoder = TextEncoder::new();
+ /// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
+ struct ChannelWriter {
+ buffer: BytesMut,
+ tx: mpsc::Sender>,
+ written: usize,
+ }
- let metrics = tokio::task::spawn_blocking(move || {
- // Currently we take a lot of mutexes while collecting metrics, so it's
- // better to spawn a blocking task to avoid blocking the event loop.
- metrics::gather()
- })
- .await
- .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?;
- encoder.encode(&metrics, &mut buffer).unwrap();
+ impl ChannelWriter {
+ fn new(buf_len: usize, tx: mpsc::Sender>) -> Self {
+ assert_ne!(buf_len, 0);
+ ChannelWriter {
+ // split about half off the buffer from the start, because we flush depending on
+ // capacity. first flush will come sooner than without this, but now resizes will
+ // have better chance of picking up the "other" half. not guaranteed of course.
+ buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
+ tx,
+ written: 0,
+ }
+ }
+
+ fn flush0(&mut self) -> std::io::Result {
+ let n = self.buffer.len();
+ if n == 0 {
+ return Ok(0);
+ }
+
+ tracing::trace!(n, "flushing");
+ let ready = self.buffer.split().freeze();
+
+ // not ideal to call from blocking code to block_on, but we are sure that this
+ // operation does not spawn_blocking other tasks
+ let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
+ self.tx.send(Ok(ready)).await.map_err(|_| ())?;
+
+ // throttle sending to allow reuse of our buffer in `write`.
+ self.tx.reserve().await.map_err(|_| ())?;
+
+ // now the response task has picked up the buffer and hopefully started
+ // sending it to the client.
+ Ok(())
+ });
+ if res.is_err() {
+ return Err(std::io::ErrorKind::BrokenPipe.into());
+ }
+ self.written += n;
+ Ok(n)
+ }
+
+ fn flushed_bytes(&self) -> usize {
+ self.written
+ }
+ }
+
+ impl std::io::Write for ChannelWriter {
+ fn write(&mut self, mut buf: &[u8]) -> std::io::Result {
+ let remaining = self.buffer.capacity() - self.buffer.len();
+
+ let out_of_space = remaining < buf.len();
+
+ let original_len = buf.len();
+
+ if out_of_space {
+ let can_still_fit = buf.len() - remaining;
+ self.buffer.extend_from_slice(&buf[..can_still_fit]);
+ buf = &buf[can_still_fit..];
+ self.flush0()?;
+ }
+
+ // assume that this will often under normal operation just move the pointer back to the
+ // beginning of allocation, because previous split off parts are already sent and
+ // dropped.
+ self.buffer.extend_from_slice(buf);
+ Ok(original_len)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.flush0().map(|_| ())
+ }
+ }
+
+ let started_at = std::time::Instant::now();
+
+ let (tx, rx) = mpsc::channel(1);
+
+ let body = Body::wrap_stream(ReceiverStream::new(rx));
+
+ let mut writer = ChannelWriter::new(128 * 1024, tx);
+
+ let encoder = TextEncoder::new();
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
- .body(Body::from(buffer))
+ .body(body)
.unwrap();
+ let span = info_span!("blocking");
+ tokio::task::spawn_blocking(move || {
+ let _span = span.entered();
+ let metrics = metrics::gather();
+ let res = encoder
+ .encode(&metrics, &mut writer)
+ .and_then(|_| writer.flush().map_err(|e| e.into()));
+
+ match res {
+ Ok(()) => {
+ tracing::info!(
+ bytes = writer.flushed_bytes(),
+ elapsed_ms = started_at.elapsed().as_millis(),
+ "responded /metrics"
+ );
+ }
+ Err(e) => {
+ tracing::warn!("failed to write out /metrics response: {e:#}");
+ // semantics of this error are quite... unclear. we want to error the stream out to
+ // abort the response to somehow notify the client that we failed.
+ //
+ // though, most likely the reason for failure is that the receiver is already gone.
+ drop(
+ writer
+ .tx
+ .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
+ );
+ }
+ }
+ });
+
Ok(response)
}