mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
feat(pageserver): num of background job metrics (#10690)
## Problem We need a metrics to know what's going on in pageserver's background jobs. ## Summary of changes * Waiting tasks: task still waiting for the semaphore. * Running tasks: tasks doing their actual jobs. --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Erik Grinaker <erik@neon.tech>
This commit is contained in:
@@ -2214,6 +2214,8 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
pub struct BackgroundLoopSemaphoreMetrics {
|
||||
counters: EnumMap<BackgroundLoopKind, IntCounterPair>,
|
||||
durations: EnumMap<BackgroundLoopKind, Counter>,
|
||||
waiting_tasks: EnumMap<BackgroundLoopKind, IntGauge>,
|
||||
running_tasks: EnumMap<BackgroundLoopKind, IntGauge>,
|
||||
}
|
||||
|
||||
pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics> = Lazy::new(
|
||||
@@ -2234,6 +2236,20 @@ pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let waiting_tasks = register_int_gauge_vec!(
|
||||
"pageserver_background_loop_semaphore_waiting_tasks",
|
||||
"Number of background loop tasks waiting for semaphore",
|
||||
&["task"],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let running_tasks = register_int_gauge_vec!(
|
||||
"pageserver_background_loop_semaphore_running_tasks",
|
||||
"Number of background loop tasks running concurrently",
|
||||
&["task"],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
BackgroundLoopSemaphoreMetrics {
|
||||
counters: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
|
||||
@@ -2243,29 +2259,69 @@ pub(crate) static BACKGROUND_LOOP_SEMAPHORE: Lazy<BackgroundLoopSemaphoreMetrics
|
||||
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
|
||||
durations.with_label_values(&[kind.into()])
|
||||
})),
|
||||
waiting_tasks: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
|
||||
waiting_tasks.with_label_values(&[kind.into()])
|
||||
})),
|
||||
running_tasks: enum_map::EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
|
||||
running_tasks.with_label_values(&[kind.into()])
|
||||
})),
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
impl BackgroundLoopSemaphoreMetrics {
|
||||
pub(crate) fn measure_acquisition(&self, task: BackgroundLoopKind) -> impl Drop + '_ {
|
||||
struct Record<'a> {
|
||||
metrics: &'a BackgroundLoopSemaphoreMetrics,
|
||||
task: BackgroundLoopKind,
|
||||
_counter_guard: metrics::IntCounterPairGuard,
|
||||
start: Instant,
|
||||
}
|
||||
impl Drop for Record<'_> {
|
||||
fn drop(&mut self) {
|
||||
let elapsed = self.start.elapsed().as_secs_f64();
|
||||
self.metrics.durations[self.task].inc_by(elapsed);
|
||||
}
|
||||
}
|
||||
Record {
|
||||
metrics: self,
|
||||
/// Starts recording semaphore metrics. Call `acquired()` on the returned recorder when the
|
||||
/// semaphore is acquired, and drop it when the task completes or is cancelled.
|
||||
pub(crate) fn record(
|
||||
&self,
|
||||
task: BackgroundLoopKind,
|
||||
) -> BackgroundLoopSemaphoreMetricsRecorder {
|
||||
BackgroundLoopSemaphoreMetricsRecorder::start(self, task)
|
||||
}
|
||||
}
|
||||
|
||||
/// Records metrics for a background task.
|
||||
pub struct BackgroundLoopSemaphoreMetricsRecorder<'a> {
|
||||
metrics: &'a BackgroundLoopSemaphoreMetrics,
|
||||
task: BackgroundLoopKind,
|
||||
start: Instant,
|
||||
wait_counter_guard: Option<metrics::IntCounterPairGuard>,
|
||||
}
|
||||
|
||||
impl<'a> BackgroundLoopSemaphoreMetricsRecorder<'a> {
|
||||
/// Starts recording semaphore metrics, by recording wait time and incrementing
|
||||
/// `wait_start_count` and `waiting_tasks`.
|
||||
fn start(metrics: &'a BackgroundLoopSemaphoreMetrics, task: BackgroundLoopKind) -> Self {
|
||||
metrics.waiting_tasks[task].inc();
|
||||
Self {
|
||||
metrics,
|
||||
task,
|
||||
_counter_guard: self.counters[task].guard(),
|
||||
start: Instant::now(),
|
||||
wait_counter_guard: Some(metrics.counters[task].guard()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Signals that the semaphore has been acquired, and updates relevant metrics.
|
||||
pub fn acquired(&mut self) {
|
||||
self.wait_counter_guard.take().expect("already acquired");
|
||||
self.metrics.durations[self.task].inc_by(self.start.elapsed().as_secs_f64());
|
||||
self.metrics.waiting_tasks[self.task].dec();
|
||||
self.metrics.running_tasks[self.task].inc();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for BackgroundLoopSemaphoreMetricsRecorder<'_> {
|
||||
/// The task either completed or was cancelled.
|
||||
fn drop(&mut self) {
|
||||
if self.wait_counter_guard.take().is_some() {
|
||||
// Waiting.
|
||||
self.metrics.durations[self.task].inc_by(self.start.elapsed().as_secs_f64());
|
||||
self.metrics.waiting_tasks[self.task].dec();
|
||||
} else {
|
||||
// Running.
|
||||
self.metrics.running_tasks[self.task].dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::metrics::{BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::throttle::Stats;
|
||||
@@ -61,21 +61,32 @@ impl BackgroundLoopKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BackgroundLoopSemaphorePermit<'a> {
|
||||
_permit: tokio::sync::SemaphorePermit<'static>,
|
||||
_recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
|
||||
}
|
||||
|
||||
/// Cancellation safe.
|
||||
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
loop_kind: BackgroundLoopKind,
|
||||
_ctx: &RequestContext,
|
||||
) -> tokio::sync::SemaphorePermit<'static> {
|
||||
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.measure_acquisition(loop_kind);
|
||||
) -> BackgroundLoopSemaphorePermit<'static> {
|
||||
let mut recorder = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.record(loop_kind);
|
||||
|
||||
if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
|
||||
pausable_failpoint!("initial-size-calculation-permit-pause");
|
||||
}
|
||||
|
||||
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
|
||||
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
|
||||
Ok(permit) => permit,
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
let permit = CONCURRENT_BACKGROUND_TASKS
|
||||
.acquire()
|
||||
.await
|
||||
.expect("should never close");
|
||||
recorder.acquired();
|
||||
|
||||
BackgroundLoopSemaphorePermit {
|
||||
_permit: permit,
|
||||
_recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,8 +30,11 @@ use crate::{
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint,
|
||||
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
|
||||
size::CalculateSyntheticSizeError,
|
||||
storage_layer::LayerVisibilityHint,
|
||||
tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit},
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -330,7 +333,7 @@ impl Timeline {
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<(), tokio::sync::SemaphorePermit<'static>> {
|
||||
) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
|
||||
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
BackgroundLoopKind::Eviction,
|
||||
ctx,
|
||||
@@ -374,7 +377,7 @@ impl Timeline {
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
gate: &GateGuard,
|
||||
permit: tokio::sync::SemaphorePermit<'static>,
|
||||
permit: BackgroundLoopSemaphorePermit<'static>,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
if !self.tenant_shard_id.is_shard_zero() {
|
||||
|
||||
Reference in New Issue
Block a user