mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
improve ondemand-download latency observability (#11421)
## Problem We don't have metrics to exactly quantify the end user impact of on-demand downloads. Perf tracing is underway (#11140) to supply us with high-resolution *samples*. But it will also be useful to have some aggregate per-timeline and per-instance metrics that definitively contain all observations. ## Summary of changes This PR consists of independent commits that should be reviewed independently. However, for convenience, we're going to merge them together. - refactor(metrics): measure_remote_op can use async traits - impr(pageserver metrics): task_kind dimension for remote_timeline_client latency histo - implements https://github.com/neondatabase/cloud/issues/26800 - refs https://github.com/neondatabase/cloud/issues/26193#issuecomment-2769705793 - use the opportunity to rename the metric and add a _global suffix; checked grafana export, it's only used in two personal dashboards, one of them mine, the other by Heikki - log on-demand download latency for expensive-to-query but precise ground truth - metric for wall clock time spent waiting for on-demand downloads ## Refs - refs https://github.com/neondatabase/cloud/issues/26800 - a bunch of minor investigations / incidents into latency outliers
This commit is contained in:
committed by
GitHub
parent
4f94751b75
commit
aad410c8f1
26
libs/utils/src/elapsed_accum.rs
Normal file
26
libs/utils/src/elapsed_accum.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ElapsedAccum {
|
||||
accum: Duration,
|
||||
}
|
||||
|
||||
impl ElapsedAccum {
|
||||
pub fn get(&self) -> Duration {
|
||||
self.accum
|
||||
}
|
||||
pub fn guard(&mut self) -> impl Drop + '_ {
|
||||
let start = Instant::now();
|
||||
scopeguard::guard(start, |last_wait_at| {
|
||||
self.accum += Instant::now() - last_wait_at;
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn measure<Fut, O>(&mut self, fut: Fut) -> O
|
||||
where
|
||||
Fut: Future<Output = O>,
|
||||
{
|
||||
let _guard = self.guard();
|
||||
fut.await
|
||||
}
|
||||
}
|
||||
@@ -93,6 +93,8 @@ pub mod try_rcu;
|
||||
|
||||
pub mod guard_arc_swap;
|
||||
|
||||
pub mod elapsed_accum;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
|
||||
@@ -111,9 +111,17 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like [`Self::get_or_init_detached_measured`], but without out parameter for time spent waiting.
|
||||
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
|
||||
self.get_or_init_detached_measured(None).await
|
||||
}
|
||||
|
||||
/// Returns a guard to an existing initialized value, or returns an unique initialization
|
||||
/// permit which can be used to initialize this `OnceCell` using `OnceCell::set`.
|
||||
pub async fn get_or_init_detached(&self) -> Result<Guard<'_, T>, InitPermit> {
|
||||
pub async fn get_or_init_detached_measured(
|
||||
&self,
|
||||
mut wait_time: Option<&mut crate::elapsed_accum::ElapsedAccum>,
|
||||
) -> Result<Guard<'_, T>, InitPermit> {
|
||||
// It looks like OnceCell::get_or_init could be implemented using this method instead of
|
||||
// duplication. However, that makes the future be !Send due to possibly holding on to the
|
||||
// MutexGuard over an await point.
|
||||
@@ -125,12 +133,16 @@ impl<T> OnceCell<T> {
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
let fut = sem.acquire();
|
||||
if let Some(wait_time) = wait_time.as_mut() {
|
||||
wait_time.measure(fut).await
|
||||
} else {
|
||||
fut.await
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(permit) = permit else {
|
||||
|
||||
@@ -89,7 +89,7 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tracing::warn;
|
||||
@@ -566,6 +566,34 @@ impl RequestContext {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ondemand_download_wait_observe(&self, duration: Duration) {
|
||||
if duration == Duration::ZERO {
|
||||
return;
|
||||
}
|
||||
|
||||
match &self.scope {
|
||||
Scope::Timeline { arc_arc } => arc_arc
|
||||
.wait_ondemand_download_time
|
||||
.observe(self.task_kind, duration),
|
||||
_ => {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
backtrace=%std::backtrace::Backtrace::force_capture(),
|
||||
"ondemand downloads should always happen within timeline scope",
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn perf_follows_from(&self, from: &RequestContext) {
|
||||
if let (Some(span), Some(from_span)) = (&self.perf_span, &from.perf_span) {
|
||||
span.inner().follows_from(from_span.inner());
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::RawFd;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use enum_map::{Enum as _, EnumMap};
|
||||
@@ -23,7 +21,6 @@ use pageserver_api::config::{
|
||||
};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pin_project_lite::pin_project;
|
||||
use postgres_backend::{QueryError, is_expected_io_error};
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use strum::{EnumCount, IntoEnumIterator as _, VariantNames};
|
||||
@@ -500,6 +497,100 @@ pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy<IntCounter> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod wait_ondemand_download_time {
|
||||
use super::*;
|
||||
const WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS: &[f64] = &[
|
||||
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, // 10 ms - 100ms
|
||||
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, // 100ms to 1s
|
||||
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, // 1s to 10s
|
||||
10.0, 20.0, 30.0, 40.0, 50.0, 60.0, // 10s to 1m
|
||||
];
|
||||
|
||||
/// The task kinds for which we want to track wait times for on-demand downloads.
|
||||
/// Other task kinds' wait times are accumulated in label value `unknown`.
|
||||
pub(crate) const WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS: [TaskKind; 2] = [
|
||||
TaskKind::PageRequestHandler,
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
];
|
||||
|
||||
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL: Lazy<Vec<Histogram>> = Lazy::new(|| {
|
||||
let histo = register_histogram_vec!(
|
||||
"pageserver_wait_ondemand_download_seconds_global",
|
||||
"Observations are individual tasks' wait times for on-demand downloads. \
|
||||
If N tasks coalesce on an on-demand download, and it takes 10s, than we observe N * 10s.",
|
||||
&["task_kind"],
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.map(|task_kind| histo.with_label_values(&[task_kind.into()]))
|
||||
.collect::<Vec<_>>()
|
||||
});
|
||||
|
||||
pub(crate) static WAIT_ONDEMAND_DOWNLOAD_TIME_SUM: Lazy<CounterVec> = Lazy::new(|| {
|
||||
register_counter_vec!(
|
||||
// use a name that _could_ be evolved into a per-timeline histogram later
|
||||
"pageserver_wait_ondemand_download_seconds_sum",
|
||||
"Like `pageserver_wait_ondemand_download_seconds_global` but per timeline",
|
||||
&["tenant_id", "shard_id", "timeline_id", "task_kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub struct WaitOndemandDownloadTimeSum {
|
||||
counters: [Counter; WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS.len()],
|
||||
}
|
||||
|
||||
impl WaitOndemandDownloadTimeSum {
|
||||
pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self {
|
||||
let counters = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.map(|task_kind| {
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_SUM
|
||||
.get_metric_with_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
task_kind.into(),
|
||||
])
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
Self {
|
||||
counters: counters.try_into().unwrap(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn observe(&self, task_kind: TaskKind, duration: Duration) {
|
||||
let maybe = WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, kind)| **kind == task_kind);
|
||||
let Some((idx, _)) = maybe else {
|
||||
return;
|
||||
};
|
||||
WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL[idx].observe(duration.as_secs_f64());
|
||||
let counter = &self.counters[idx];
|
||||
counter.inc_by(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown_timeline(tenant_id: &str, shard_id: &str, timeline_id: &str) {
|
||||
for task_kind in WAIT_ONDEMAND_DOWNLOAD_METRIC_TASK_KINDS {
|
||||
let _ = WAIT_ONDEMAND_DOWNLOAD_TIME_SUM.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
task_kind.into(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn preinitialize_global_metrics() {
|
||||
Lazy::force(&WAIT_ONDEMAND_DOWNLOAD_TIME_GLOBAL);
|
||||
}
|
||||
}
|
||||
|
||||
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_last_record_lsn",
|
||||
@@ -2315,13 +2406,18 @@ impl RemoteOpFileKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static REMOTE_OPERATION_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
pub(crate) static REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_remote_operation_seconds",
|
||||
"Time spent on remote storage operations. \
|
||||
Grouped by tenant, timeline, operation_kind and status. \
|
||||
"pageserver_remote_timeline_client_seconds_global",
|
||||
"Time spent on remote timeline client operations. \
|
||||
Grouped by task_kind, file_kind, operation_kind and status. \
|
||||
The task_kind is \
|
||||
- for layer downloads, populated from RequestContext (primary objective of having the label) \
|
||||
- for index downloads, set to 'unknown' \
|
||||
- for any upload operation, set to 'RemoteUploadTask' \
|
||||
This keeps dimensionality at bay. \
|
||||
Does not account for time spent waiting in remote timeline client's queues.",
|
||||
&["file_kind", "op_kind", "status"]
|
||||
&["task_kind", "file_kind", "op_kind", "status"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -2883,6 +2979,7 @@ pub(crate) struct TimelineMetrics {
|
||||
pub storage_io_size: StorageIoSizeMetrics,
|
||||
pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter,
|
||||
pub wait_lsn_start_finish_counterpair: IntCounterPair,
|
||||
pub wait_ondemand_download_time: wait_ondemand_download_time::WaitOndemandDownloadTimeSum,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -3028,6 +3125,13 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let wait_ondemand_download_time =
|
||||
wait_ondemand_download_time::WaitOndemandDownloadTimeSum::new(
|
||||
&tenant_id,
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
shard_id,
|
||||
@@ -3061,6 +3165,7 @@ impl TimelineMetrics {
|
||||
wal_records_received,
|
||||
wait_lsn_in_progress_micros,
|
||||
wait_lsn_start_finish_counterpair,
|
||||
wait_ondemand_download_time,
|
||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
@@ -3253,6 +3358,8 @@ impl TimelineMetrics {
|
||||
.remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
|
||||
wait_ondemand_download_time::shutdown_timeline(tenant_id, shard_id, timeline_id);
|
||||
|
||||
let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[
|
||||
SmgrQueryType::GetPageAtLsn.into(),
|
||||
tenant_id,
|
||||
@@ -3374,13 +3481,18 @@ impl RemoteTimelineClientMetrics {
|
||||
|
||||
pub fn remote_operation_time(
|
||||
&self,
|
||||
task_kind: Option<TaskKind>,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
status: &'static str,
|
||||
) -> Histogram {
|
||||
let key = (file_kind.as_str(), op_kind.as_str(), status);
|
||||
REMOTE_OPERATION_TIME
|
||||
.get_metric_with_label_values(&[key.0, key.1, key.2])
|
||||
REMOTE_TIMELINE_CLIENT_COMPLETION_LATENCY
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind.as_ref().map(|tk| tk.into()).unwrap_or("unknown"),
|
||||
file_kind.as_str(),
|
||||
op_kind.as_str(),
|
||||
status,
|
||||
])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -3625,54 +3737,26 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
|
||||
/// Wrapper future that measures the time spent by a remote storage operation,
|
||||
/// and records the time and success/failure as a prometheus metric.
|
||||
pub(crate) trait MeasureRemoteOp: Sized {
|
||||
fn measure_remote_op(
|
||||
pub(crate) trait MeasureRemoteOp<O, E>: Sized + Future<Output = Result<O, E>> {
|
||||
async fn measure_remote_op(
|
||||
self,
|
||||
task_kind: Option<TaskKind>, // not all caller contexts have a RequestContext / TaskKind handy
|
||||
file_kind: RemoteOpFileKind,
|
||||
op: RemoteOpKind,
|
||||
metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
) -> MeasuredRemoteOp<Self> {
|
||||
) -> Result<O, E> {
|
||||
let start = Instant::now();
|
||||
MeasuredRemoteOp {
|
||||
inner: self,
|
||||
file_kind,
|
||||
op,
|
||||
start,
|
||||
metrics,
|
||||
}
|
||||
let res = self.await;
|
||||
let duration = start.elapsed();
|
||||
let status = if res.is_ok() { &"success" } else { &"failure" };
|
||||
metrics
|
||||
.remote_operation_time(task_kind, &file_kind, &op, status)
|
||||
.observe(duration.as_secs_f64());
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized> MeasureRemoteOp for T {}
|
||||
|
||||
pin_project! {
|
||||
pub(crate) struct MeasuredRemoteOp<F>
|
||||
{
|
||||
#[pin]
|
||||
inner: F,
|
||||
file_kind: RemoteOpFileKind,
|
||||
op: RemoteOpKind,
|
||||
start: Instant,
|
||||
metrics: Arc<RemoteTimelineClientMetrics>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
|
||||
type Output = Result<O, E>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let poll_result = this.inner.poll(cx);
|
||||
if let Poll::Ready(ref res) = poll_result {
|
||||
let duration = this.start.elapsed();
|
||||
let status = if res.is_ok() { &"success" } else { &"failure" };
|
||||
this.metrics
|
||||
.remote_operation_time(this.file_kind, this.op, status)
|
||||
.observe(duration.as_secs_f64());
|
||||
}
|
||||
poll_result
|
||||
}
|
||||
}
|
||||
impl<Fut, O, E> MeasureRemoteOp<O, E> for Fut where Fut: Sized + Future<Output = Result<O, E>> {}
|
||||
|
||||
pub mod tokio_epoll_uring {
|
||||
use std::collections::HashMap;
|
||||
@@ -4220,4 +4304,5 @@ pub fn preinitialize_metrics(
|
||||
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
|
||||
|
||||
tenant_throttling::preinitialize_global_metrics();
|
||||
wait_ondemand_download_time::preinitialize_global_metrics();
|
||||
}
|
||||
|
||||
@@ -642,6 +642,7 @@ impl RemoteTimelineClient {
|
||||
cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Option::<TaskKind>::None,
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -739,6 +740,7 @@ impl RemoteTimelineClient {
|
||||
ctx,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(ctx.task_kind()),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2175,6 +2177,7 @@ impl RemoteTimelineClient {
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2191,6 +2194,7 @@ impl RemoteTimelineClient {
|
||||
&self.cancel,
|
||||
)
|
||||
.measure_remote_op(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
Arc::clone(&self.metrics),
|
||||
|
||||
@@ -975,6 +975,10 @@ impl LayerInner {
|
||||
allow_download: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
let mut wait_for_download_recorder =
|
||||
scopeguard::guard(utils::elapsed_accum::ElapsedAccum::default(), |accum| {
|
||||
ctx.ondemand_download_wait_observe(accum.get());
|
||||
});
|
||||
let (weak, permit) = {
|
||||
// get_or_init_detached can:
|
||||
// - be fast (mutex lock) OR uncontested semaphore permit acquire
|
||||
@@ -983,7 +987,7 @@ impl LayerInner {
|
||||
|
||||
let locked = self
|
||||
.inner
|
||||
.get_or_init_detached()
|
||||
.get_or_init_detached_measured(Some(&mut wait_for_download_recorder))
|
||||
.await
|
||||
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
|
||||
|
||||
@@ -1013,6 +1017,7 @@ impl LayerInner {
|
||||
Err(permit) => (None, permit),
|
||||
}
|
||||
};
|
||||
let _guard = wait_for_download_recorder.guard();
|
||||
|
||||
if let Some(weak) = weak {
|
||||
// only drop the weak after dropping the heavier_once_cell guard
|
||||
@@ -1202,6 +1207,7 @@ impl LayerInner {
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<DownloadedLayer>, remote_storage::DownloadError> {
|
||||
let start = std::time::Instant::now();
|
||||
let result = timeline
|
||||
.remote_client
|
||||
.download_layer_file(
|
||||
@@ -1213,7 +1219,8 @@ impl LayerInner {
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let latency = start.elapsed();
|
||||
let latency_millis = u64::try_from(latency.as_millis()).unwrap();
|
||||
match result {
|
||||
Ok(size) => {
|
||||
assert_eq!(size, self.desc.file_size);
|
||||
@@ -1229,9 +1236,8 @@ impl LayerInner {
|
||||
Err(e) => {
|
||||
panic!("post-condition failed: needs_download errored: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
|
||||
};
|
||||
tracing::info!(size=%self.desc.file_size, %latency_millis, "on-demand download successful");
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(self.desc.file_size);
|
||||
@@ -1260,7 +1266,7 @@ impl LayerInner {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
tracing::error!(consecutive_failures, %latency_millis, "layer file download failed: {e:#}");
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
|
||||
@@ -46,14 +46,26 @@ class MetricsGetter:
|
||||
def get_metrics(self) -> Metrics:
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_metric_value(self, name: str, filter: dict[str, str] | None = None) -> float | None:
|
||||
def get_metric_value(
|
||||
self,
|
||||
name: str,
|
||||
filter: dict[str, str] | None = None,
|
||||
aggregate: Literal["sum"] | None = None,
|
||||
) -> float | None:
|
||||
metrics = self.get_metrics()
|
||||
results = metrics.query_all(name, filter=filter)
|
||||
if not results:
|
||||
log.info(f'could not find metric "{name}"')
|
||||
return None
|
||||
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
|
||||
return results[0].value
|
||||
if aggregate is None:
|
||||
assert len(results) == 1, (
|
||||
f"metric {name} with given filters is not unique, got: {results}"
|
||||
)
|
||||
return results[0].value
|
||||
elif aggregate == "sum":
|
||||
return sum(sample.value for sample in results)
|
||||
else:
|
||||
raise RuntimeError(f"unknown aggregate function {aggregate}")
|
||||
|
||||
def get_metrics_values(
|
||||
self, names: list[str], filter: dict[str, str] | None = None, absence_ok: bool = False
|
||||
@@ -132,7 +144,7 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*histogram("pageserver_smgr_query_seconds_global"),
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
*histogram("pageserver_remote_timeline_client_seconds_global"),
|
||||
*histogram("pageserver_io_operations_seconds"),
|
||||
"pageserver_smgr_query_started_global_count_total",
|
||||
"pageserver_tenant_states_count",
|
||||
@@ -143,6 +155,7 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
|
||||
counter("pageserver_tenant_throttling_count_global"),
|
||||
*histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"),
|
||||
*histogram("pageserver_wait_ondemand_download_seconds_global"),
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
@@ -180,6 +193,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_wait_lsn_in_progress_micros"),
|
||||
counter("pageserver_wait_lsn_started_count"),
|
||||
counter("pageserver_wait_lsn_finished_count"),
|
||||
counter("pageserver_wait_ondemand_download_seconds_sum"),
|
||||
*histogram("pageserver_page_service_batch_size"),
|
||||
*histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
|
||||
@@ -126,7 +126,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder):
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
name="pageserver_remote_timeline_client_seconds_global_count",
|
||||
filter={
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
|
||||
@@ -38,12 +38,13 @@ def get_num_downloaded_layers(client: PageserverHttpClient):
|
||||
This assumes that the pageserver only has a single tenant.
|
||||
"""
|
||||
value = client.get_metric_value(
|
||||
"pageserver_remote_operation_seconds_count",
|
||||
"pageserver_remote_timeline_client_seconds_global_count",
|
||||
{
|
||||
"file_kind": "layer",
|
||||
"op_kind": "download",
|
||||
"status": "success",
|
||||
},
|
||||
"sum",
|
||||
)
|
||||
if value is None:
|
||||
return 0
|
||||
|
||||
@@ -107,7 +107,7 @@ def test_metric_collection(
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
name="pageserver_remote_timeline_client_seconds_global_count",
|
||||
filter={
|
||||
"file_kind": str(file_kind),
|
||||
"op_kind": str(op_kind),
|
||||
|
||||
Reference in New Issue
Block a user