diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 550ab10700..3c71628870 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -156,6 +156,10 @@ pub struct ChannelWriter { buffer: BytesMut, pub tx: mpsc::Sender>, written: usize, + /// Time spent waiting for the channel to make progress. It is not the same as time to upload a + /// buffer because we cannot know anything about that, but this should allow us to understand + /// the actual time taken without the time spent `std::thread::park`ed. + wait_time: std::time::Duration, } impl ChannelWriter { @@ -168,6 +172,7 @@ impl ChannelWriter { buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2), tx, written: 0, + wait_time: std::time::Duration::ZERO, } } @@ -180,6 +185,8 @@ impl ChannelWriter { tracing::trace!(n, "flushing"); let ready = self.buffer.split().freeze(); + let wait_started_at = std::time::Instant::now(); + // not ideal to call from blocking code to block_on, but we are sure that this // operation does not spawn_blocking other tasks let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async { @@ -192,6 +199,9 @@ impl ChannelWriter { // sending it to the client. Ok(()) }); + + self.wait_time += wait_started_at.elapsed(); + if res.is_err() { return Err(std::io::ErrorKind::BrokenPipe.into()); } @@ -202,6 +212,10 @@ impl ChannelWriter { pub fn flushed_bytes(&self) -> usize { self.written } + + pub fn wait_time(&self) -> std::time::Duration { + self.wait_time + } } impl std::io::Write for ChannelWriter { @@ -252,22 +266,52 @@ async fn prometheus_metrics_handler(_req: Request) -> Result { tracing::info!( bytes = writer.flushed_bytes(), - elapsed_ms = started_at.elapsed().as_millis(), + total_ms = total.as_millis(), + spawning_ms = spawned_in.as_millis(), + collection_ms = collected_in.as_millis(), + encoding_ms = encoded_in.as_millis(), "responded /metrics" ); } Err(e) => { - tracing::warn!("failed to write out /metrics response: {e:#}"); + // there is a chance that this error is not the BrokenPipe we generate in the writer + // for "closed connection", but it is highly unlikely. + tracing::warn!( + after_bytes = writer.flushed_bytes(), + total_ms = total.as_millis(), + spawning_ms = spawned_in.as_millis(), + collection_ms = collected_in.as_millis(), + encoding_ms = encoded_in.as_millis(), + "failed to write out /metrics response: {e:?}" + ); // semantics of this error are quite... unclear. we want to error the stream out to // abort the response to somehow notify the client that we failed. //