metrics: record more details of the responding (#6979)

On eu-west-1 during benchmarks we sometimes lose samples. Add more time
measurements.
This commit is contained in:
Joonas Koivunen
2024-03-01 16:04:39 +02:00
committed by GitHub
parent f8bdce1015
commit 5ab10d051d

View File

@@ -156,6 +156,10 @@ pub struct ChannelWriter {
buffer: BytesMut,
pub tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
/// Time spent waiting for the channel to make progress. It is not the same as time to upload a
/// buffer because we cannot know anything about that, but this should allow us to understand
/// the actual time taken without the time spent `std::thread::park`ed.
wait_time: std::time::Duration,
}
impl ChannelWriter {
@@ -168,6 +172,7 @@ impl ChannelWriter {
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
wait_time: std::time::Duration::ZERO,
}
}
@@ -180,6 +185,8 @@ impl ChannelWriter {
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
let wait_started_at = std::time::Instant::now();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
@@ -192,6 +199,9 @@ impl ChannelWriter {
// sending it to the client.
Ok(())
});
self.wait_time += wait_started_at.elapsed();
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
@@ -202,6 +212,10 @@ impl ChannelWriter {
pub fn flushed_bytes(&self) -> usize {
self.written
}
pub fn wait_time(&self) -> std::time::Duration {
self.wait_time
}
}
impl std::io::Write for ChannelWriter {
@@ -252,22 +266,52 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
let span = info_span!("blocking");
tokio::task::spawn_blocking(move || {
// there are situations where we lose scraped metrics under load, try to gather some clues
// since all nodes are queried this, keep the message count low.
let spawned_at = std::time::Instant::now();
let _span = span.entered();
let metrics = metrics::gather();
let gathered_at = std::time::Instant::now();
let res = encoder
.encode(&metrics, &mut writer)
.and_then(|_| writer.flush().map_err(|e| e.into()));
// this instant is not when we finally got the full response sent, sending is done by hyper
// in another task.
let encoded_at = std::time::Instant::now();
let spawned_in = spawned_at - started_at;
let collected_in = gathered_at - spawned_at;
// remove the wait time here in case the tcp connection was clogged
let encoded_in = encoded_at - gathered_at - writer.wait_time();
let total = encoded_at - started_at;
match res {
Ok(()) => {
tracing::info!(
bytes = writer.flushed_bytes(),
elapsed_ms = started_at.elapsed().as_millis(),
total_ms = total.as_millis(),
spawning_ms = spawned_in.as_millis(),
collection_ms = collected_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
"responded /metrics"
);
}
Err(e) => {
tracing::warn!("failed to write out /metrics response: {e:#}");
// there is a chance that this error is not the BrokenPipe we generate in the writer
// for "closed connection", but it is highly unlikely.
tracing::warn!(
after_bytes = writer.flushed_bytes(),
total_ms = total.as_millis(),
spawning_ms = spawned_in.as_millis(),
collection_ms = collected_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
"failed to write out /metrics response: {e:?}"
);
// semantics of this error are quite... unclear. we want to error the stream out to
// abort the response to somehow notify the client that we failed.
//