pageserver: throttling: per-tenant metrics + more metrics to help understand throttle queue depth (#9077)

This commit is contained in:
Christian Schwarz
2024-09-20 18:48:26 +02:00
committed by GitHub
parent 6014f15157
commit ec5dce04eb
6 changed files with 250 additions and 77 deletions

View File

@@ -2645,6 +2645,8 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
}
tenant_throttling::remove_tenant_metrics(tenant_shard_id);
// we leave the BROKEN_TENANTS_SET entry if any
}
@@ -3108,41 +3110,180 @@ pub mod tokio_epoll_uring {
pub(crate) mod tenant_throttling {
use metrics::{register_int_counter_vec, IntCounter};
use once_cell::sync::Lazy;
use utils::shard::TenantShardId;
use crate::tenant::{self, throttle::Metric};
pub(crate) struct TimelineGet {
wait_time: IntCounter,
count: IntCounter,
struct GlobalAndPerTenantIntCounter {
global: IntCounter,
per_tenant: IntCounter,
}
pub(crate) static TIMELINE_GET: Lazy<TimelineGet> = Lazy::new(|| {
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_wait_usecs_sum_global",
"Sum of microseconds that tenants spent waiting for a tenant throttle of a given kind.",
impl GlobalAndPerTenantIntCounter {
#[inline(always)]
pub(crate) fn inc(&self) {
self.inc_by(1)
}
#[inline(always)]
pub(crate) fn inc_by(&self, n: u64) {
self.global.inc_by(n);
self.per_tenant.inc_by(n);
}
}
pub(crate) struct TimelineGet {
count_accounted_start: GlobalAndPerTenantIntCounter,
count_accounted_finish: GlobalAndPerTenantIntCounter,
wait_time: GlobalAndPerTenantIntCounter,
count_throttled: GlobalAndPerTenantIntCounter,
}
static COUNT_ACCOUNTED_START: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_start_global",
"Count of tenant throttling starts, by kind of throttle.",
&["kind"]
)
.unwrap()
});
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_global",
"Count of tenant throttlings, by kind of throttle.",
&["kind"]
)
.unwrap()
});
let kind = "timeline_get";
TimelineGet {
wait_time: WAIT_USECS.with_label_values(&[kind]),
count: WAIT_COUNT.with_label_values(&[kind]),
}
.unwrap()
});
static COUNT_ACCOUNTED_START_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_start",
"Count of tenant throttling starts, by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
static COUNT_ACCOUNTED_FINISH: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_finish_global",
"Count of tenant throttling finishes, by kind of throttle.",
&["kind"]
)
.unwrap()
});
static COUNT_ACCOUNTED_FINISH_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_accounted_finish",
"Count of tenant throttling finishes, by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_wait_usecs_sum_global",
"Sum of microseconds that spent waiting throttle by kind of throttle.",
&["kind"]
)
.unwrap()
});
static WAIT_USECS_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_wait_usecs_sum",
"Sum of microseconds that spent waiting throttle by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
impl Metric for &'static TimelineGet {
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_global",
"Count of tenant throttlings, by kind of throttle.",
&["kind"]
)
.unwrap()
});
static WAIT_COUNT_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count",
"Count of tenant throttlings, by kind of throttle.",
&["kind", "tenant_id", "shard_id"]
)
.unwrap()
});
const KIND: &str = "timeline_get";
impl TimelineGet {
pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self {
TimelineGet {
count_accounted_start: {
GlobalAndPerTenantIntCounter {
global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]),
per_tenant: COUNT_ACCOUNTED_START_PER_TENANT.with_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]),
}
},
count_accounted_finish: {
GlobalAndPerTenantIntCounter {
global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]),
per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT.with_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]),
}
},
wait_time: {
GlobalAndPerTenantIntCounter {
global: WAIT_USECS.with_label_values(&[KIND]),
per_tenant: WAIT_USECS_PER_TENANT.with_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]),
}
},
count_throttled: {
GlobalAndPerTenantIntCounter {
global: WAIT_COUNT.with_label_values(&[KIND]),
per_tenant: WAIT_COUNT_PER_TENANT.with_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]),
}
},
}
}
}
pub(crate) fn preinitialize_global_metrics() {
Lazy::force(&COUNT_ACCOUNTED_START);
Lazy::force(&COUNT_ACCOUNTED_FINISH);
Lazy::force(&WAIT_USECS);
Lazy::force(&WAIT_COUNT);
}
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
for m in &[
&COUNT_ACCOUNTED_START_PER_TENANT,
&COUNT_ACCOUNTED_FINISH_PER_TENANT,
&WAIT_USECS_PER_TENANT,
&WAIT_COUNT_PER_TENANT,
] {
let _ = m.remove_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]);
}
}
impl Metric for TimelineGet {
#[inline(always)]
fn accounting_start(&self) {
self.count_accounted_start.inc();
}
#[inline(always)]
fn accounting_finish(&self) {
self.count_accounted_finish.inc();
}
#[inline(always)]
fn observe_throttling(
&self,
@@ -3150,7 +3291,7 @@ pub(crate) mod tenant_throttling {
) {
let val = u64::try_from(wait_time.as_micros()).unwrap();
self.wait_time.inc_by(val);
self.count.inc();
self.count_throttled.inc();
}
}
}
@@ -3309,7 +3450,8 @@ pub fn preinitialize_metrics() {
// Custom
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&tenant_throttling::TIMELINE_GET);
Lazy::force(&BASEBACKUP_QUERY_TIME);
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
tenant_throttling::preinitialize_global_metrics();
}

View File

@@ -302,7 +302,7 @@ pub struct Tenant {
/// Throttle applied at the top of [`Timeline::get`].
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
Arc<throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// An ongoing timeline detach concurrency limiter.
///
@@ -2831,7 +2831,7 @@ impl Tenant {
gate: Gate::default(),
timeline_get_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
&crate::metrics::tenant_throttling::TIMELINE_GET,
crate::metrics::tenant_throttling::TimelineGet::new(&tenant_shard_id),
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),

View File

@@ -163,8 +163,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// How many errors we have seen consequtively
let mut error_run_count = 0;
let mut last_throttle_flag_reset_at = Instant::now();
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
@@ -191,8 +189,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
}
}
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
@@ -207,12 +203,18 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
};
// Run compaction
let IterationResult { output, elapsed } = iteration.run(tenant.compaction_iteration(&cancel, &ctx)).await;
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(has_pending_task) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if has_pending_task { Duration::ZERO } else { period };
sleep_duration = if has_pending_task {
Duration::ZERO
} else {
period
};
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
@@ -233,38 +235,20 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
}
// the duration is recorded by performance tests by enabling debug in this function
tracing::debug!(elapsed_ms=elapsed.as_millis(), "compaction iteration complete");
tracing::debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
// TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}",
delta.as_secs_f64()),
count_accounted,
count_throttled,
sum_throttled_usecs,
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
@@ -437,6 +421,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
tokio::select! {
_ = cancel.cancelled() => {
@@ -483,6 +468,29 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
kind: BackgroundLoopKind::IngestHouseKeeping,
};
iteration.run(tenant.ingest_housekeeping()).await;
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
// Or just spawn another background loop for this throttle, it's not like it's super costly.
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.timeline_get_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}",
delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
.await;

View File

@@ -24,8 +24,10 @@ use crate::{context::RequestContext, task_mgr::TaskKind};
pub struct Throttle<M: Metric> {
inner: ArcSwap<Inner>,
metric: M,
/// will be turned into [`Stats::count_accounted`]
count_accounted: AtomicU64,
/// will be turned into [`Stats::count_accounted_start`]
count_accounted_start: AtomicU64,
/// will be turned into [`Stats::count_accounted_finish`]
count_accounted_finish: AtomicU64,
/// will be turned into [`Stats::count_throttled`]
count_throttled: AtomicU64,
/// will be turned into [`Stats::sum_throttled_usecs`]
@@ -43,17 +45,21 @@ pub struct Observation {
pub wait_time: Duration,
}
pub trait Metric {
fn accounting_start(&self);
fn accounting_finish(&self);
fn observe_throttling(&self, observation: &Observation);
}
/// See [`Throttle::reset_stats`].
pub struct Stats {
// Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
pub count_accounted: u64,
// Subset of the `accounted` requests that were actually throttled.
// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
/// Number of requests that started [`Throttle::throttle`] calls.
pub count_accounted_start: u64,
/// Number of requests that finished [`Throttle::throttle`] calls.
pub count_accounted_finish: u64,
/// Subset of the `accounted` requests that were actually throttled.
/// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
pub count_throttled: u64,
// Sum of microseconds that throttled requests spent waiting for throttling.
/// Sum of microseconds that throttled requests spent waiting for throttling.
pub sum_throttled_usecs: u64,
}
@@ -65,7 +71,8 @@ where
Self {
inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
metric,
count_accounted: AtomicU64::new(0),
count_accounted_start: AtomicU64::new(0),
count_accounted_finish: AtomicU64::new(0),
count_throttled: AtomicU64::new(0),
sum_throttled_usecs: AtomicU64::new(0),
}
@@ -117,11 +124,13 @@ where
/// This method allows retrieving & resetting that flag.
/// Useful for periodic reporting.
pub fn reset_stats(&self) -> Stats {
let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
let count_accounted_start = self.count_accounted_start.swap(0, Ordering::Relaxed);
let count_accounted_finish = self.count_accounted_finish.swap(0, Ordering::Relaxed);
let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
Stats {
count_accounted,
count_accounted_start,
count_accounted_finish,
count_throttled,
sum_throttled_usecs,
}
@@ -139,9 +148,12 @@ where
};
let start = std::time::Instant::now();
self.metric.accounting_start();
self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
let did_throttle = inner.rate_limiter.acquire(key_count).await;
self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
self.metric.accounting_finish();
self.count_accounted.fetch_add(1, Ordering::Relaxed);
if did_throttle {
self.count_throttled.fetch_add(1, Ordering::Relaxed);
let now = Instant::now();

View File

@@ -196,9 +196,8 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
pub timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
@@ -406,9 +405,8 @@ pub struct Timeline {
gc_lock: tokio::sync::Mutex<()>,
/// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,

View File

@@ -102,6 +102,11 @@ def histogram(prefix_without_trailing_underscore: str) -> List[str]:
return [f"{prefix_without_trailing_underscore}_{x}" for x in ["bucket", "count", "sum"]]
def counter(name: str) -> str:
# the prometheus_client package appends _total to all counters client-side
return f"{name}_total"
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
"pageserver_remote_timeline_client_calls_started_total",
"pageserver_remote_timeline_client_calls_finished_total",
@@ -136,6 +141,10 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_tenant_states_count",
"pageserver_circuit_breaker_broken_total",
"pageserver_circuit_breaker_unbroken_total",
counter("pageserver_tenant_throttling_count_accounted_start_global"),
counter("pageserver_tenant_throttling_count_accounted_finish_global"),
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
counter("pageserver_tenant_throttling_count_global"),
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
@@ -159,6 +168,10 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold
# "pageserver_broken_tenants_count" -- used only for broken