mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
fix
This commit is contained in:
@@ -3559,11 +3559,16 @@ pub mod tokio_epoll_uring {
|
||||
}
|
||||
|
||||
pub(crate) mod tenant_throttling {
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use metrics::{register_int_counter_vec, IntCounter};
|
||||
use once_cell::sync::Lazy;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use crate::tenant::{self};
|
||||
use crate::{
|
||||
assert_u64_eq_usize::UsizeIsU64,
|
||||
tenant::{self},
|
||||
};
|
||||
|
||||
struct GlobalAndPerTenantIntCounter {
|
||||
global: IntCounter,
|
||||
@@ -3571,10 +3576,6 @@ pub(crate) mod tenant_throttling {
|
||||
}
|
||||
|
||||
impl GlobalAndPerTenantIntCounter {
|
||||
#[inline(always)]
|
||||
pub(crate) fn inc(&self) {
|
||||
self.inc_by(1)
|
||||
}
|
||||
#[inline(always)]
|
||||
pub(crate) fn inc_by(&self, n: u64) {
|
||||
self.global.inc_by(n);
|
||||
@@ -3592,7 +3593,7 @@ pub(crate) mod tenant_throttling {
|
||||
static COUNT_ACCOUNTED_START: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_start_global",
|
||||
"Count of tenant throttling starts, by kind of throttle.",
|
||||
"Like pageserver_tenant_throttling_count_accounted_start, but aggregated to the instance.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
@@ -3600,7 +3601,8 @@ pub(crate) mod tenant_throttling {
|
||||
static COUNT_ACCOUNTED_START_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_start",
|
||||
"Count of tenant throttling starts, by kind of throttle.",
|
||||
"Counter incremented for each key that enters the throttling stage.
|
||||
A batched request will increment this counter by the number of requests in the batch.",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
@@ -3608,7 +3610,7 @@ pub(crate) mod tenant_throttling {
|
||||
static COUNT_ACCOUNTED_FINISH: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_finish_global",
|
||||
"Count of tenant throttling finishes, by kind of throttle.",
|
||||
"Like pageserver_tenant_throttling_count_accounted_finish, but aggregated to the instance.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
@@ -3616,7 +3618,8 @@ pub(crate) mod tenant_throttling {
|
||||
static COUNT_ACCOUNTED_FINISH_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_count_accounted_finish",
|
||||
"Count of tenant throttling finishes, by kind of throttle.",
|
||||
"Counter incremented for each key that exits the throttling stage.
|
||||
A batched request will increment this counter by the number of requests in the batch.",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
@@ -3624,7 +3627,7 @@ pub(crate) mod tenant_throttling {
|
||||
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_wait_usecs_sum_global",
|
||||
"Sum of microseconds that spent waiting throttle by kind of throttle.",
|
||||
"Like pageserver_tenant_throttling_wait_usecs_sum, but aggregated to the instance.",
|
||||
&["kind"]
|
||||
)
|
||||
.unwrap()
|
||||
@@ -3632,7 +3635,8 @@ pub(crate) mod tenant_throttling {
|
||||
static WAIT_USECS_PER_TENANT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_tenant_throttling_wait_usecs_sum",
|
||||
"Sum of microseconds that spent waiting throttle by kind of throttle.",
|
||||
"Wall clock time spent waiting on the throttle.
|
||||
A batched request counts as one ",
|
||||
&["kind", "tenant_id", "shard_id"]
|
||||
)
|
||||
.unwrap()
|
||||
@@ -3724,21 +3728,26 @@ pub(crate) mod tenant_throttling {
|
||||
|
||||
impl<const KIND: usize> tenant::throttle::Metric for Metrics<KIND> {
|
||||
#[inline(always)]
|
||||
fn accounting_start(&self) {
|
||||
self.count_accounted_start.inc();
|
||||
fn accounting_start(&self, key_count: NonZeroUsize) {
|
||||
self.count_accounted_start
|
||||
.inc_by(key_count.get().into_u64());
|
||||
}
|
||||
#[inline(always)]
|
||||
fn accounting_finish(&self) {
|
||||
self.count_accounted_finish.inc();
|
||||
fn accounting_finish(&self, key_count: NonZeroUsize) {
|
||||
self.count_accounted_finish
|
||||
.inc_by(key_count.get().into_u64());
|
||||
}
|
||||
#[inline(always)]
|
||||
fn observe_throttling(
|
||||
&self,
|
||||
tenant::throttle::Observation { wait_time }: &tenant::throttle::Observation,
|
||||
tenant::throttle::Observation {
|
||||
key_count,
|
||||
wait_time,
|
||||
}: &tenant::throttle::Observation,
|
||||
) {
|
||||
let val = u64::try_from(wait_time.as_micros()).unwrap();
|
||||
self.wait_time.inc_by(val);
|
||||
self.count_throttled.inc();
|
||||
self.count_throttled.inc_by(key_count.get().into_u64());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -588,13 +588,14 @@ impl BatchedFeMessage {
|
||||
shard,
|
||||
// 1 token is probably under-estimating because these
|
||||
// request handlers typically do several Timeline::get calls.
|
||||
1,
|
||||
NonZeroUsize::new(1).unwrap(),
|
||||
itertools::Either::Left(std::iter::once(timer)),
|
||||
)
|
||||
}
|
||||
BatchedFeMessage::GetPage { shard, pages, .. } => (
|
||||
shard,
|
||||
pages.len(),
|
||||
NonZeroUsize::new(pages.len())
|
||||
.expect("a batch has always at least one request in it"),
|
||||
itertools::Either::Right(pages.iter_mut().map(|(_, _, timer)| timer)),
|
||||
),
|
||||
BatchedFeMessage::RespondError { .. } => return Ok(()),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::{
|
||||
num::NonZeroUsize,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
@@ -9,6 +10,8 @@ use std::{
|
||||
use arc_swap::ArcSwap;
|
||||
use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
|
||||
|
||||
use crate::assert_u64_eq_usize::UsizeIsU64;
|
||||
|
||||
/// Throttle for `async` functions.
|
||||
///
|
||||
/// Runtime reconfigurable.
|
||||
@@ -37,11 +40,12 @@ pub struct Inner {
|
||||
pub type Config = pageserver_api::models::ThrottleConfig;
|
||||
|
||||
pub struct Observation {
|
||||
pub key_count: NonZeroUsize,
|
||||
pub wait_time: Duration,
|
||||
}
|
||||
pub trait Metric {
|
||||
fn accounting_start(&self);
|
||||
fn accounting_finish(&self);
|
||||
fn accounting_start(&self, key_count: NonZeroUsize);
|
||||
fn accounting_finish(&self, key_count: NonZeroUsize);
|
||||
fn observe_throttling(&self, observation: &Observation);
|
||||
}
|
||||
|
||||
@@ -127,7 +131,7 @@ where
|
||||
self.inner.load().rate_limiter.steady_rps()
|
||||
}
|
||||
|
||||
pub async fn throttle(&self, key_count: usize) -> ThrottleResult {
|
||||
pub async fn throttle(&self, key_count: NonZeroUsize) -> ThrottleResult {
|
||||
let inner = self.inner.load_full(); // clones the `Inner` Arc
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
@@ -136,11 +140,13 @@ where
|
||||
return ThrottleResult::NotThrottled { start };
|
||||
}
|
||||
|
||||
self.metric.accounting_start();
|
||||
self.count_accounted_start.fetch_add(1, Ordering::Relaxed);
|
||||
let did_throttle = inner.rate_limiter.acquire(key_count).await;
|
||||
self.count_accounted_finish.fetch_add(1, Ordering::Relaxed);
|
||||
self.metric.accounting_finish();
|
||||
self.metric.accounting_start(key_count);
|
||||
self.count_accounted_start
|
||||
.fetch_add(key_count.get().into_u64(), Ordering::Relaxed);
|
||||
let did_throttle = inner.rate_limiter.acquire(key_count.get()).await;
|
||||
self.count_accounted_finish
|
||||
.fetch_add(key_count.get().into_u64(), Ordering::Relaxed);
|
||||
self.metric.accounting_finish(key_count);
|
||||
|
||||
if did_throttle {
|
||||
self.count_throttled.fetch_add(1, Ordering::Relaxed);
|
||||
@@ -148,7 +154,10 @@ where
|
||||
let wait_time = now - start;
|
||||
self.sum_throttled_usecs
|
||||
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
|
||||
let observation = Observation { wait_time };
|
||||
let observation = Observation {
|
||||
key_count,
|
||||
wait_time,
|
||||
};
|
||||
self.metric.observe_throttling(&observation);
|
||||
ThrottleResult::Throttled { start, end: now }
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user