page_service: throttle individual requests instead of the batched request (#10353)

## Problem

Before this PR, the pagestream throttle was applied weighted on a
per-batch basis.
This had several problems:

1. The throttle occurence counters were only bumped by `1` instead of
`batch_size`.
2. The throttle wait time aggregator metric only counted one wait time,
irrespective
of `batch_size`. That makes sense in some ways of looking at it but not
in others.
3. If the last request in the batch runs into the throttle, the other
requests in the
batch are also throttled, i.e., over-throttling happens (theoretical,
didn't measure
   it in practice).

## Solution

It occured to me that we can simply push the throttling upwards into
`pagestream_read_message`.

This has the added benefit that in pipeline mode, the `executor` stage
will, if it is idle,
steal whatever requests already made it into the `spsc_fold` and execute
them; before this
change, that was not the case - the throttling happened in the
`executor` stage instead of
the `batcher` stage.
   
## Code Changes

There are two changes in this PR:

1. Lifting up the throttling into the `pagestream_read_message` method.
2. Move the throttling metrics out of the `Throttle` type into
`SmgrOpMetrics`.
Unlike the other smgr metrics, throttling is per-tenant, hence the Arc.
3. Refactor the `SmgrOpTimer` implementation to account for the new
observation states,
   and simplify its design.
4. Drive-by-fix flush time metrics. It was using the same `now` in the
`observe_guard` every time.

The `SmgrOpTimer` is now a state machine.
Each observation point moves the state machine forward.
If a timer object is dropped early some "pair"-like metrics still
require an increment or observation.
That's done in the Drop implementation, by driving the state machine to
completion.
This commit is contained in:
Christian Schwarz
2025-01-14 16:28:01 +01:00
committed by GitHub
parent 9bdb14c1c0
commit 2466a2f977
6 changed files with 284 additions and 290 deletions

View File

@@ -1224,117 +1224,189 @@ pub(crate) struct SmgrOpTimerInner {
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
throttling: Arc<tenant_throttling::Pagestream>,
timings: SmgrOpTimerState,
}
/// The stages of request processing are represented by the enum variants.
/// Used as part of [`SmgrOpTimerInner::timings`].
///
/// Request processing calls into the `SmgrOpTimer::observe_*` methods at the
/// transition points.
/// These methods bump relevant counters and then update [`SmgrOpTimerInner::timings`]
/// to the next state.
///
/// Each request goes through every stage, in all configurations.
///
#[derive(Debug)]
enum SmgrOpTimerState {
Received {
// In the future, we may want to track the full time the request spent
// inside pageserver process (time spent in kernel buffers can't be tracked).
// `received_at` would be used for that.
#[allow(dead_code)]
received_at: Instant,
},
ThrottleDoneExecutionStarting {
received_at: Instant,
Throttling {
throttle_started_at: Instant,
started_execution_at: Instant,
},
Batching {
throttle_done_at: Instant,
},
Executing {
execution_started_at: Instant,
},
Flushing,
// NB: when adding observation points, remember to update the Drop impl.
}
// NB: when adding observation points, remember to update the Drop impl.
impl SmgrOpTimer {
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_throttle_start(&mut self, at: Instant) {
let Some(inner) = self.0.as_mut() else {
return;
};
let SmgrOpTimerState::Received { received_at: _ } = &mut inner.timings else {
return;
};
inner.throttling.count_accounted_start.inc();
inner.timings = SmgrOpTimerState::Throttling {
throttle_started_at: at,
};
}
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_throttle_done(&mut self, throttle: ThrottleResult) {
let Some(inner) = self.0.as_mut() else {
return;
};
let SmgrOpTimerState::Throttling {
throttle_started_at,
} = &inner.timings
else {
return;
};
inner.throttling.count_accounted_finish.inc();
match throttle {
ThrottleResult::NotThrottled { end } => {
inner.timings = SmgrOpTimerState::Batching {
throttle_done_at: end,
};
}
ThrottleResult::Throttled { end } => {
// update metrics
inner.throttling.count_throttled.inc();
inner
.throttling
.wait_time
.inc_by((end - *throttle_started_at).as_micros().try_into().unwrap());
// state transition
inner.timings = SmgrOpTimerState::Batching {
throttle_done_at: end,
};
}
}
}
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_start(&mut self, at: Instant) {
let Some(inner) = self.0.as_mut() else {
return;
};
let SmgrOpTimerState::Batching { throttle_done_at } = &inner.timings else {
return;
};
// update metrics
let batch = at - *throttle_done_at;
inner.global_batch_wait_time.observe(batch.as_secs_f64());
inner
.per_timeline_batch_wait_time
.observe(batch.as_secs_f64());
// state transition
inner.timings = SmgrOpTimerState::Executing {
execution_started_at: at,
}
}
/// For all but the first caller, this is a no-op.
/// The first callers receives Some, subsequent ones None.
///
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_end_flush_start(
&mut self,
at: Instant,
) -> Option<SmgrOpFlushInProgress> {
// NB: unlike the other observe_* methods, this one take()s.
#[allow(clippy::question_mark)] // maintain similar code pattern.
let Some(mut inner) = self.0.take() else {
return None;
};
let SmgrOpTimerState::Executing {
execution_started_at,
} = &inner.timings
else {
return None;
};
// update metrics
let execution = at - *execution_started_at;
inner
.global_execution_latency_histo
.observe(execution.as_secs_f64());
if let Some(per_timeline_execution_latency_histo) =
&inner.per_timeline_execution_latency_histo
{
per_timeline_execution_latency_histo.observe(execution.as_secs_f64());
}
// state transition
inner.timings = SmgrOpTimerState::Flushing;
// return the flush in progress object which
// will do the remaining metrics updates
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
Some(SmgrOpFlushInProgress {
flush_started_at: at,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
})
}
}
/// The last stage of request processing is serializing and flushing the request
/// into the TCP connection. We want to make slow flushes observable
/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`]
/// to periodically bump the metric.
///
/// If in the future we decide that we're not interested in live updates, we can
/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there,
/// and remove this struct from the code base.
pub(crate) struct SmgrOpFlushInProgress {
flush_started_at: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl SmgrOpTimer {
pub(crate) fn observe_throttle_done_execution_starting(&mut self, throttle: &ThrottleResult) {
let inner = self.0.as_mut().expect("other public methods consume self");
match (&mut inner.timings, throttle) {
(SmgrOpTimerState::Received { received_at }, throttle) => match throttle {
ThrottleResult::NotThrottled { start } => {
inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at: *received_at,
throttle_started_at: *start,
started_execution_at: *start,
};
}
ThrottleResult::Throttled { start, end } => {
inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at: *start,
throttle_started_at: *start,
started_execution_at: *end,
};
}
},
(x, _) => panic!("called in unexpected state: {x:?}"),
}
}
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
flush_started_at: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
}
/// Returns `None`` if this method has already been called, `Some` otherwise.
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
let inner = self.0.take()?;
let now = Instant::now();
let batch;
let execution;
let throttle;
match inner.timings {
SmgrOpTimerState::Received { received_at } => {
batch = (now - received_at).as_secs_f64();
// TODO: use label for dropped requests.
// This is quite rare in practice, only during tenant/pageservers shutdown.
throttle = Duration::ZERO;
execution = Duration::ZERO.as_secs_f64();
}
SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at,
throttle_started_at,
started_execution_at,
} => {
batch = (throttle_started_at - received_at).as_secs_f64();
throttle = started_execution_at - throttle_started_at;
execution = (now - started_execution_at).as_secs_f64();
}
}
// update time spent in batching
inner.global_batch_wait_time.observe(batch);
inner.per_timeline_batch_wait_time.observe(batch);
// time spent in throttle metric is updated by throttle impl
let _ = throttle;
// update metrics for execution latency
inner.global_execution_latency_histo.observe(execution);
if let Some(per_timeline_execution_latency_histo) =
&inner.per_timeline_execution_latency_histo
{
per_timeline_execution_latency_histo.observe(execution);
}
Some((now, inner))
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
self.smgr_op_end();
// In case of early drop, update any of the remaining metrics with
// observations so that (started,finished) counter pairs balance out
// and all counters on the latency path have the the same number of
// observations.
// It's technically lying and it would be better if each metric had
// a separate label or similar for cancelled requests.
// But we don't have that right now and counter pairs balancing
// out is useful when using the metrics in panels and whatnot.
let now = Instant::now();
self.observe_throttle_start(now);
self.observe_throttle_done(ThrottleResult::NotThrottled { end: now });
self.observe_execution_start(now);
self.observe_execution_end_flush_start(now);
}
}
@@ -1345,12 +1417,12 @@ impl SmgrOpFlushInProgress {
{
let mut fut = std::pin::pin!(fut);
let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
@@ -1393,7 +1465,6 @@ pub enum SmgrQueryType {
GetSlruSegment,
}
#[derive(Debug)]
pub(crate) struct SmgrQueryTimePerTimeline {
global_started: [IntCounter; SmgrQueryType::COUNT],
global_latency: [Histogram; SmgrQueryType::COUNT],
@@ -1405,6 +1476,7 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros: IntCounter,
global_batch_wait_time: Histogram,
per_timeline_batch_wait_time: Histogram,
throttling: Arc<tenant_throttling::Pagestream>,
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
@@ -1610,7 +1682,11 @@ static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL: Lazy<Histogram> = Lazy::new(||
});
impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
pub(crate) fn new(
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
pagestream_throttle_metrics: Arc<tenant_throttling::Pagestream>,
) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_slug = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id.to_string();
@@ -1671,6 +1747,7 @@ impl SmgrQueryTimePerTimeline {
per_timeline_flush_in_progress_micros,
global_batch_wait_time,
per_timeline_batch_wait_time,
throttling: pagestream_throttle_metrics,
}
}
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, received_at: Instant) -> SmgrOpTimer {
@@ -1686,88 +1763,24 @@ impl SmgrQueryTimePerTimeline {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_execution_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_execution_latency_histo: per_timeline_latency_histo,
timings: SmgrOpTimerState::Received { received_at },
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
global_batch_wait_time: self.global_batch_wait_time.clone(),
per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(),
throttling: self.throttling.clone(),
timings: SmgrOpTimerState::Received { received_at },
}))
}
/// TODO: do something about this? seems odd, we have a similar call on SmgrOpTimer
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
self.global_batch_size.observe(batch_size as f64);
self.per_timeline_batch_size.observe(batch_size as f64);
}
}
#[cfg(test)]
mod smgr_query_time_tests {
use std::time::Instant;
use pageserver_api::shard::TenantShardId;
use strum::IntoEnumIterator;
use utils::id::{TenantId, TimelineId};
// Regression test, we used hard-coded string constants before using an enum.
#[test]
fn op_label_name() {
use super::SmgrQueryType::*;
let expect: [(super::SmgrQueryType, &'static str); 5] = [
(GetRelExists, "get_rel_exists"),
(GetRelSize, "get_rel_size"),
(GetPageAtLsn, "get_page_at_lsn"),
(GetDbSize, "get_db_size"),
(GetSlruSegment, "get_slru_segment"),
];
for (op, expect) in expect {
let actual: &'static str = op.into();
assert_eq!(actual, expect);
}
}
#[test]
fn basic() {
let ops: Vec<_> = super::SmgrQueryType::iter().collect();
for op in &ops {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let metrics = super::SmgrQueryTimePerTimeline::new(
&TenantShardId::unsharded(tenant_id),
&timeline_id,
);
let get_counts = || {
let global: u64 = ops
.iter()
.map(|op| metrics.global_latency[*op as usize].get_sample_count())
.sum();
(
global,
metrics.per_timeline_getpage_latency.get_sample_count(),
)
};
let (pre_global, pre_per_tenant_timeline) = get_counts();
assert_eq!(pre_per_tenant_timeline, 0);
let timer = metrics.start_smgr_op(*op, Instant::now());
drop(timer);
let (post_global, post_per_tenant_timeline) = get_counts();
if matches!(op, super::SmgrQueryType::GetPageAtLsn) {
// getpage ops are tracked per-timeline, others aren't
assert_eq!(post_per_tenant_timeline, 1);
} else {
assert_eq!(post_per_tenant_timeline, 0);
}
assert!(post_global > pre_global);
}
}
}
// keep in sync with control plane Go code so that we can validate
// compute's basebackup_ms metric with our perspective in the context of SLI/SLO.
static COMPUTE_STARTUP_BUCKETS: Lazy<[f64; 28]> = Lazy::new(|| {
@@ -3563,9 +3576,7 @@ pub(crate) mod tenant_throttling {
use once_cell::sync::Lazy;
use utils::shard::TenantShardId;
use crate::tenant::{self};
struct GlobalAndPerTenantIntCounter {
pub(crate) struct GlobalAndPerTenantIntCounter {
global: IntCounter,
per_tenant: IntCounter,
}
@@ -3583,10 +3594,10 @@ pub(crate) mod tenant_throttling {
}
pub(crate) struct Metrics<const KIND: usize> {
count_accounted_start: GlobalAndPerTenantIntCounter,
count_accounted_finish: GlobalAndPerTenantIntCounter,
wait_time: GlobalAndPerTenantIntCounter,
count_throttled: GlobalAndPerTenantIntCounter,
pub(super) count_accounted_start: GlobalAndPerTenantIntCounter,
pub(super) count_accounted_finish: GlobalAndPerTenantIntCounter,
pub(super) wait_time: GlobalAndPerTenantIntCounter,
pub(super) count_throttled: GlobalAndPerTenantIntCounter,
}
static COUNT_ACCOUNTED_START: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
@@ -3721,26 +3732,6 @@ pub(crate) mod tenant_throttling {
}
}
}
impl<const KIND: usize> tenant::throttle::Metric for Metrics<KIND> {
#[inline(always)]
fn accounting_start(&self) {
self.count_accounted_start.inc();
}
#[inline(always)]
fn accounting_finish(&self) {
self.count_accounted_finish.inc();
}
#[inline(always)]
fn observe_throttling(
&self,
tenant::throttle::Observation { wait_time }: &tenant::throttle::Observation,
) {
let val = u64::try_from(wait_time.as_micros()).unwrap();
self.wait_time.inc_by(val);
self.count_throttled.inc();
}
}
}
pub(crate) mod disk_usage_based_eviction {

View File

@@ -592,43 +592,21 @@ enum BatchedFeMessage {
}
impl BatchedFeMessage {
async fn throttle_and_record_start_processing(
&mut self,
cancel: &CancellationToken,
) -> Result<(), QueryError> {
let (shard, tokens, timers) = match self {
BatchedFeMessage::Exists { shard, timer, .. }
| BatchedFeMessage::Nblocks { shard, timer, .. }
| BatchedFeMessage::DbSize { shard, timer, .. }
| BatchedFeMessage::GetSlruSegment { shard, timer, .. } => {
(
shard,
// 1 token is probably under-estimating because these
// request handlers typically do several Timeline::get calls.
1,
itertools::Either::Left(std::iter::once(timer)),
)
fn observe_execution_start(&mut self, at: Instant) {
match self {
BatchedFeMessage::Exists { timer, .. }
| BatchedFeMessage::Nblocks { timer, .. }
| BatchedFeMessage::DbSize { timer, .. }
| BatchedFeMessage::GetSlruSegment { timer, .. } => {
timer.observe_execution_start(at);
}
BatchedFeMessage::GetPage { shard, pages, .. } => (
shard,
pages.len(),
itertools::Either::Right(pages.iter_mut().map(|p| &mut p.timer)),
),
BatchedFeMessage::RespondError { .. } => return Ok(()),
};
let throttled = tokio::select! {
throttled = shard.pagestream_throttle.throttle(tokens) => { throttled }
_ = shard.cancel.cancelled() => {
return Err(QueryError::Shutdown);
BatchedFeMessage::GetPage { pages, .. } => {
for page in pages {
page.timer.observe_execution_start(at);
}
}
_ = cancel.cancelled() => {
return Err(QueryError::Shutdown);
}
};
for timer in timers {
timer.observe_throttle_done_execution_starting(&throttled);
BatchedFeMessage::RespondError { .. } => {}
}
Ok(())
}
}
@@ -720,6 +698,26 @@ impl PageServerHandler {
let neon_fe_msg =
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
// TODO: turn in to async closure once available to avoid repeating received_at
async fn record_op_start_and_throttle(
shard: &timeline::handle::Handle<TenantManagerTypes>,
op: metrics::SmgrQueryType,
received_at: Instant,
) -> Result<SmgrOpTimer, QueryError> {
// It's important to start the smgr op metric recorder as early as possible
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for throttle, batching, or actual request handling.
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
let now = Instant::now();
timer.observe_throttle_start(now);
let throttled = tokio::select! {
res = shard.pagestream_throttle.throttle(1, now) => res,
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
};
timer.observe_throttle_done(throttled);
Ok(timer)
}
let batched_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let span = tracing::info_span!(parent: parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn);
@@ -727,9 +725,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetRelExists, received_at);
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelExists,
received_at,
)
.await?;
BatchedFeMessage::Exists {
span,
timer,
@@ -743,9 +744,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetRelSize, received_at);
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetRelSize,
received_at,
)
.await?;
BatchedFeMessage::Nblocks {
span,
timer,
@@ -759,9 +763,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetDbSize, received_at);
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetDbSize,
received_at,
)
.await?;
BatchedFeMessage::DbSize {
span,
timer,
@@ -775,9 +782,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.instrument(span.clone()) // sets `shard_id` field
.await?;
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetSlruSegment, received_at);
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetSlruSegment,
received_at,
)
.await?;
BatchedFeMessage::GetSlruSegment {
span,
timer,
@@ -826,12 +836,12 @@ impl PageServerHandler {
}
};
// It's important to start the timer before waiting for the LSN
// so that the _started counters are incremented before we do
// any serious waiting, e.g., for LSNs.
let timer = shard
.query_metrics
.start_smgr_op(metrics::SmgrQueryType::GetPageAtLsn, received_at);
let timer = record_op_start_and_throttle(
&shard,
metrics::SmgrQueryType::GetPageAtLsn,
received_at,
)
.await?;
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
@@ -937,6 +947,13 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let started_at = Instant::now();
let batch = {
let mut batch = batch;
batch.observe_execution_start(started_at);
batch
};
// invoke handler function
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
@@ -1103,8 +1120,11 @@ impl PageServerHandler {
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
let flushing_timer = timer.map(|mut timer| {
timer
.observe_execution_end_flush_start(Instant::now())
.expect("we are the first caller")
});
// what we want to do
let flush_fut = pgb_writer.flush();
@@ -1258,7 +1278,7 @@ impl PageServerHandler {
Ok(msg) => msg,
Err(e) => break e,
};
let mut msg = match msg {
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
@@ -1266,10 +1286,6 @@ impl PageServerHandler {
}
};
if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await {
break cancelled;
}
let err = self
.pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx)
.await;
@@ -1429,15 +1445,12 @@ impl PageServerHandler {
return Ok(());
}
};
let mut batch = match batch {
let batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
};
batch
.throttle_and_record_start_processing(&self.cancel)
.await?;
self.pagesteam_handle_batched_message(
pgb_writer,
batch,

View File

@@ -365,8 +365,9 @@ pub struct Tenant {
/// Throttle applied at the top of [`Timeline::get`].
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) pagestream_throttle:
Arc<throttle::Throttle<crate::metrics::tenant_throttling::Pagestream>>,
pub(crate) pagestream_throttle: Arc<throttle::Throttle>,
pub(crate) pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
/// An ongoing timeline detach concurrency limiter.
///
@@ -1687,6 +1688,7 @@ impl Tenant {
TimelineResources {
remote_client,
pagestream_throttle: self.pagestream_throttle.clone(),
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
LoadTimelineCause::Attach,
@@ -3992,6 +3994,9 @@ impl Tenant {
Ok(timeline)
}
/// [`Tenant::shutdown`] must be called before dropping the returned [`Tenant`] object
/// to ensure proper cleanup of background tasks and metrics.
//
// Allow too_many_arguments because a constructor's argument list naturally grows with the
// number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
#[allow(clippy::too_many_arguments)]
@@ -4100,8 +4105,10 @@ impl Tenant {
gate: Gate::default(),
pagestream_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf),
crate::metrics::tenant_throttling::Metrics::new(&tenant_shard_id),
)),
pagestream_throttle_metrics: Arc::new(
crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id),
),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
gc_block: Default::default(),
@@ -5008,6 +5015,7 @@ impl Tenant {
TimelineResources {
remote_client: self.build_timeline_remote_client(timeline_id),
pagestream_throttle: self.pagestream_throttle.clone(),
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}

View File

@@ -3,7 +3,7 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
time::Instant,
};
use arc_swap::ArcSwap;
@@ -16,9 +16,8 @@ use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
/// To share a throttle among multiple entities, wrap it in an [`Arc`].
///
/// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
pub struct Throttle<M: Metric> {
pub struct Throttle {
inner: ArcSwap<Inner>,
metric: M,
/// will be turned into [`Stats::count_accounted_start`]
count_accounted_start: AtomicU64,
/// will be turned into [`Stats::count_accounted_finish`]
@@ -36,15 +35,6 @@ pub struct Inner {
pub type Config = pageserver_api::models::ThrottleConfig;
pub struct Observation {
pub wait_time: Duration,
}
pub trait Metric {
fn accounting_start(&self);
fn accounting_finish(&self);
fn observe_throttling(&self, observation: &Observation);
}
/// See [`Throttle::reset_stats`].
pub struct Stats {
/// Number of requests that started [`Throttle::throttle`] calls.
@@ -59,18 +49,14 @@ pub struct Stats {
}
pub enum ThrottleResult {
NotThrottled { start: Instant },
Throttled { start: Instant, end: Instant },
NotThrottled { end: Instant },
Throttled { end: Instant },
}
impl<M> Throttle<M>
where
M: Metric,
{
pub fn new(config: Config, metric: M) -> Self {
impl Throttle {
pub fn new(config: Config) -> Self {
Self {
inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
metric,
count_accounted_start: AtomicU64::new(0),
count_accounted_finish: AtomicU64::new(0),
count_throttled: AtomicU64::new(0),
@@ -127,32 +113,27 @@ where
self.inner.load().rate_limiter.steady_rps()
}
pub async fn throttle(&self, key_count: usize) -> ThrottleResult {
/// `start` must be [`Instant::now`] or earlier.
pub async fn throttle(&self, key_count: usize, start: Instant) -> ThrottleResult {
let inner = self.inner.load_full(); // clones the `Inner` Arc
let start = std::time::Instant::now();
if !inner.enabled {
return ThrottleResult::NotThrottled { start };
return ThrottleResult::NotThrottled { end: start };
}
self.metric.accounting_start();
self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
let did_throttle = inner.rate_limiter.acquire(key_count).await;
self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
self.metric.accounting_finish();
if did_throttle {
self.count_throttled.fetch_add(1, Ordering::Relaxed);
let now = Instant::now();
let wait_time = now - start;
let end = Instant::now();
let wait_time = end - start;
self.sum_throttled_usecs
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
ThrottleResult::Throttled { start, end: now }
ThrottleResult::Throttled { end }
} else {
ThrottleResult::NotThrottled { start }
ThrottleResult::NotThrottled { end: start }
}
}
}

View File

@@ -208,8 +208,8 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub pagestream_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::Pagestream>>,
pub pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
@@ -412,8 +412,7 @@ pub struct Timeline {
gc_lock: tokio::sync::Mutex<()>,
/// Cloned from [`super::Tenant::pagestream_throttle`] on construction.
pub(crate) pagestream_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::Pagestream>>,
pub(crate) pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
/// Size estimator for aux file v2
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
@@ -2310,6 +2309,7 @@ impl Timeline {
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
&tenant_shard_id,
&timeline_id,
resources.pagestream_throttle_metrics,
),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),

View File

@@ -301,6 +301,7 @@ impl DeleteTimelineFlow {
TimelineResources {
remote_client,
pagestream_throttle: tenant.pagestream_throttle.clone(),
pagestream_throttle_metrics: tenant.pagestream_throttle_metrics.clone(),
l0_flush_global_state: tenant.l0_flush_global_state.clone(),
},
// Important. We dont pass ancestor above because it can be missing.