feat(per-tenant throttling): exclude throttled time from page_service metrics + regression test (#6953)

part of https://github.com/neondatabase/neon/issues/5899

Problem
-------

Before this PR, the time spent waiting on the throttle was charged
towards the higher-level page_service metrics, i.e.,
`pageserver_smgr_query_seconds`.
The metrics are the foundation of internal SLIs / SLOs.
A throttled tenant would cause the SLI to degrade / SLO alerts to fire.

Changes
-------


- don't charge time spent in throttle towards the page_service metrics
- record time spent in throttle in RequestContext and subtract it from
the elapsed time
- this works because the page_service path doesn't create child context,
so, all the throttle time is recorded in the parent
- it's quite brittle and will break if we ever decide to spawn child
tasks that need child RequestContexts, which would have separate
instances of the `micros_spent_throttled` counter.
- however, let's punt that to a more general refactoring of
RequestContext
- add a test case that ensures that
- throttling happens for getpage requests; this aspect of the test
passed before this PR
- throttling delays aren't charged towards the page_service metrics;
this aspect of the test only passes with this PR
- drive-by: make the throttle log message `info!`, it's an expected
condition

Performance
-----------

I took the same measurements as in #6706 , no meaningful change in CPU
overhead.

Future Work
-----------

This PR enables us to experiment with the throttle for select tenants
without affecting the SLI metrics / triggering SLO alerts.

Before declaring this feature done, we need more work to happen,
specifically:

- decide on whether we want to retain the flexibility of throttling any
`Timeline::get` call, filtered by TaskKind
- versus: separate throttles for each page_service endpoint, potentially
with separate config options
- the trouble here is that this decision implies changes to the
TenantConfig, so, if we start using the current config style now, then
decide to switch to a different config, it'll be a breaking change

Nice-to-haves but probably not worth the time right now:

- Equivalent tests to ensure the throttle applies to all other
page_service handlers.
This commit is contained in:
Christian Schwarz
2024-03-05 14:44:00 +01:00
committed by GitHub
parent 9dec65b75b
commit 270d3be507
7 changed files with 308 additions and 15 deletions

View File

@@ -88,13 +88,16 @@
use crate::task_mgr::TaskKind;
pub(crate) mod optional_counter;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct RequestContext {
task_kind: TaskKind,
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32,
}
/// The kind of access to the page cache.
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
micros_spent_throttled: Default::default(),
},
}
}
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
micros_spent_throttled: Default::default(),
},
}
}

View File

@@ -0,0 +1,101 @@
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
#[derive(Debug)]
pub struct CounterU32 {
inner: AtomicU32,
}
impl Default for CounterU32 {
fn default() -> Self {
Self {
inner: AtomicU32::new(u32::MAX),
}
}
}
impl CounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
match self
.inner
.compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => Ok(()),
Err(_) => Err("open() called on clsoed state"),
}
}
pub fn close(&self) -> Result<u32, &'static str> {
match self.inner.swap(u32::MAX, Ordering::Relaxed) {
u32::MAX => Err("close() called on closed state"),
x => Ok(x),
}
}
pub fn add(&self, count: u32) -> Result<(), &'static str> {
if count == 0 {
return Ok(());
}
let mut had_err = None;
self.inner
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur {
u32::MAX => {
had_err = Some("add() called on closed state");
None
}
x => {
let (new, overflowed) = x.overflowing_add(count);
if new == u32::MAX || overflowed {
had_err = Some("add() overflowed the counter");
None
} else {
Some(new)
}
}
})
.map_err(|_| had_err.expect("we set it whenever the function returns None"))
.map(|_| ())
}
}
#[derive(Default, Debug)]
pub struct MicroSecondsCounterU32 {
inner: CounterU32,
}
impl MicroSecondsCounterU32 {
pub fn open(&self) -> Result<(), &'static str> {
self.inner.open()
}
pub fn add(&self, duration: Duration) -> Result<(), &'static str> {
match duration.as_micros().try_into() {
Ok(x) => self.inner.add(x),
Err(_) => Err("add(): duration conversion error"),
}
}
pub fn close_and_checked_sub_from(&self, from: Duration) -> Result<Duration, &'static str> {
let val = self.inner.close()?;
let val = Duration::from_micros(val as u64);
let subbed = match from.checked_sub(val) {
Some(v) => v,
None => return Err("Duration::checked_sub"),
};
Ok(subbed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic() {
let counter = MicroSecondsCounterU32::default();
counter.open().unwrap();
counter.add(Duration::from_micros(23)).unwrap();
let res = counter
.close_and_checked_sub_from(Duration::from_micros(42))
.unwrap();
assert_eq!(res, Duration::from_micros(42 - 23));
}
}

View File

@@ -11,6 +11,7 @@ use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, IntoEnumIterator, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use tracing::warn;
use utils::id::TimelineId;
/// Prometheus histogram buckets (in seconds) for operations in the critical
@@ -1005,15 +1006,39 @@ impl GlobalAndPerTimelineHistogram {
}
}
struct GlobalAndPerTimelineHistogramTimer<'a> {
struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
h: &'a GlobalAndPerTimelineHistogram,
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
}
impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> {
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
self.h.observe(elapsed.as_secs_f64());
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(res) => res,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
self.h.observe(ex_throttled.as_secs_f64());
}
}
@@ -1025,6 +1050,7 @@ impl<'a> Drop for GlobalAndPerTimelineHistogramTimer<'a> {
strum_macros::EnumCount,
strum_macros::EnumIter,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[strum(serialize_all = "snake_case")]
pub enum SmgrQueryType {
@@ -1130,11 +1156,35 @@ impl SmgrQueryTimePerTimeline {
});
Self { metrics }
}
pub(crate) fn start_timer(&self, op: SmgrQueryType) -> impl Drop + '_ {
pub(crate) fn start_timer<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> impl Drop + '_ {
let metric = &self.metrics[op as usize];
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[op];
rate_limit.call(|| {
warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
GlobalAndPerTimelineHistogramTimer {
h: metric,
start: std::time::Instant::now(),
ctx,
start,
op,
}
}
}
@@ -1145,6 +1195,11 @@ mod smgr_query_time_tests {
use strum::IntoEnumIterator;
use utils::id::{TenantId, TimelineId};
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
};
// Regression test, we used hard-coded string constants before using an enum.
#[test]
fn op_label_name() {
@@ -1193,7 +1248,8 @@ mod smgr_query_time_tests {
let (pre_global, pre_per_tenant_timeline) = get_counts();
assert_eq!(pre_per_tenant_timeline, 0);
let timer = metrics.start_timer(*op);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
let timer = metrics.start_timer(*op, &ctx);
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();

View File

@@ -910,7 +910,7 @@ impl PageServerHandler {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelExists);
.start_timer(metrics::SmgrQueryType::GetRelExists, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
@@ -938,7 +938,7 @@ impl PageServerHandler {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetRelSize);
.start_timer(metrics::SmgrQueryType::GetRelSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
@@ -966,7 +966,7 @@ impl PageServerHandler {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetDbSize);
.start_timer(metrics::SmgrQueryType::GetDbSize, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
@@ -1144,7 +1144,7 @@ impl PageServerHandler {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
@@ -1172,7 +1172,7 @@ impl PageServerHandler {
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetSlruSegment);
.start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =

View File

@@ -217,7 +217,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
}
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
let delta = now - prev;
warn!(
info!(
n_seconds=%format_args!("{:.3}",
delta.as_secs_f64()),
count_accounted,

View File

@@ -2,14 +2,14 @@ use std::{
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, Mutex,
},
time::{Duration, Instant},
};
use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::error;
use tracing::{error, warn};
use crate::{context::RequestContext, task_mgr::TaskKind};
@@ -157,6 +157,19 @@ where
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
match ctx.micros_spent_throttled.add(wait_time) {
Ok(res) => res,
Err(error) => {
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
static WARN_RATE_LIMIT: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut guard = WARN_RATE_LIMIT.lock().unwrap();
guard.call(move || {
warn!(error, "error adding time spent throttled; this message is logged at a global rate limit");
});
}
}
}
}
}

View File

@@ -0,0 +1,118 @@
import json
import uuid
from anyio import Path
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
env.pageserver.tenant_detach(env.initial_tenant)
env.pageserver.allowed_errors.append(
# https://github.com/neondatabase/neon/issues/6925
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
rate_limit_rps = 100
compaction_period = 5
env.pageserver.tenant_create(
tenant_id,
conf={
"compaction_period": f"{compaction_period}s",
"timeline_get_throttle": {
"task_kinds": ["PageRequestHandler"],
"initial": 0,
"refill_interval": "100ms",
"refill_amount": int(rate_limit_rps / 10),
"max": int(rate_limit_rps / 10),
"fair": True,
},
},
)
ps_http = env.pageserver.http_client()
ps_http.timeline_create(PgVersion.V16, tenant_id, timeline_id)
def run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs: int):
cmd = [
str(env.neon_binpath / "pagebench"),
"get-page-latest-lsn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--runtime",
f"{duration_secs}s",
f"{tenant_id}/{timeline_id}",
]
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
return int(results["total"]["request_count"])
log.info("warmup / make sure metrics are present")
run_pagebench_at_max_speed_and_get_total_requests_completed(2)
metrics_query = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"smgr_query_type": "get_page_at_lsn",
}
metric_name = "pageserver_smgr_query_seconds_sum"
smgr_query_seconds_pre = ps_http.get_metric_value(metric_name, metrics_query)
assert smgr_query_seconds_pre is not None
marker = uuid.uuid4().hex
ps_http.post_tracing_event("info", marker)
_, marker_offset = wait_until(
10, 0.5, lambda: env.pageserver.assert_log_contains(marker, offset=None)
)
log.info("run pagebench")
duration_secs = 10
actual_ncompleted = run_pagebench_at_max_speed_and_get_total_requests_completed(duration_secs)
log.info("validate the client is capped at the configured rps limit")
expect_ncompleted = duration_secs * rate_limit_rps
delta_abs = abs(expect_ncompleted - actual_ncompleted)
threshold = 0.05 * expect_ncompleted
assert (
threshold / rate_limit_rps < 0.1 * duration_secs
), "test self-test: unrealistic expecations regarding precision in this test"
assert (
delta_abs < 0.05 * expect_ncompleted
), "the throttling deviates more than 5percent from the expectation"
log.info("validate that we logged the throttling")
wait_until(
10,
compaction_period / 10,
lambda: env.pageserver.assert_log_contains(
f".*{tenant_id}.*shard was throttled in the last n_seconds.*",
offset=marker_offset,
),
)
log.info("validate that the metric doesn't include throttle wait time")
smgr_query_seconds_post = ps_http.get_metric_value(metric_name, metrics_query)
assert smgr_query_seconds_post is not None
actual_smgr_query_seconds = smgr_query_seconds_post - smgr_query_seconds_pre
assert (
duration_secs >= 10 * actual_smgr_query_seconds
), "smgr metrics should not include throttle wait time"