per-TenantShard read throttling (#6706)

This commit is contained in:
Christian Schwarz
2024-02-16 21:26:59 +01:00
committed by GitHub
parent 5d039c6e9b
commit ca07fa5f8b
18 changed files with 510 additions and 211 deletions

15
Cargo.lock generated
View File

@@ -1813,6 +1813,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e875f1719c16de097dee81ed675e2d9bb63096823ed3f0ca827b7dea3028bbbb"
dependencies = [
"enumset_derive",
"serde",
]
[[package]]
@@ -2757,6 +2758,17 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "leaky-bucket"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853"
dependencies = [
"parking_lot 0.12.1",
"tokio",
"tracing",
]
[[package]]
name = "libc"
version = "0.2.150"
@@ -3448,6 +3460,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-compression",
"async-stream",
"async-trait",
@@ -3475,6 +3488,7 @@ dependencies = [
"humantime-serde",
"hyper",
"itertools",
"leaky-bucket",
"md5",
"metrics",
"nix 0.27.1",
@@ -6347,6 +6361,7 @@ dependencies = [
"hex-literal",
"hyper",
"jsonwebtoken",
"leaky-bucket",
"metrics",
"nix 0.27.1",
"once_cell",

View File

@@ -97,6 +97,7 @@ ipnet = "2.9.0"
itertools = "0.10"
jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
memoffset = "0.8"

View File

@@ -400,6 +400,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'lazy_slru_download' as bool")?,
timeline_get_throttle: settings
.remove("timeline_get_throttle")
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -505,6 +510,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'lazy_slru_download' as bool")?,
timeline_get_throttle: settings
.remove("timeline_get_throttle")
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
}
};

View File

@@ -283,6 +283,7 @@ pub struct TenantConfig {
pub gc_feedback: Option<bool>,
pub heatmap_period: Option<String>,
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -309,6 +310,35 @@ pub struct EvictionPolicyLayerAccessThreshold {
pub threshold: Duration,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ThrottleConfig {
pub task_kinds: Vec<String>, // TaskKind
pub initial: usize,
#[serde(with = "humantime_serde")]
pub refill_interval: Duration,
pub refill_amount: NonZeroUsize,
pub max: usize,
pub fair: bool,
}
impl ThrottleConfig {
pub fn disabled() -> Self {
Self {
task_kinds: vec![], // effectively disables the throttle
// other values don't matter with emtpy `task_kinds`.
initial: 0,
refill_interval: Duration::from_millis(1),
refill_amount: NonZeroUsize::new(1).unwrap(),
max: 1,
fair: true,
}
}
/// The requests per second allowed by the given config.
pub fn steady_rps(&self) -> f64 {
(self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64()) / 1e3
}
}
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.

View File

@@ -25,6 +25,7 @@ hyper = { workspace = true, features = ["full"] }
fail.workspace = true
futures = { workspace = true}
jsonwebtoken.workspace = true
leaky-bucket.workspace = true
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
@@ -35,6 +36,7 @@ humantime.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses
@@ -82,7 +84,7 @@ workspace_hack.workspace = true
reqwest.workspace = true
rpds.workspace = true
enum-map.workspace = true
enumset.workspace = true
enumset = { workspace = true, features = ["serde"]}
strum.workspace = true
strum_macros.workspace = true

View File

@@ -1,6 +1,5 @@
use anyhow::Context;
use camino::Utf8PathBuf;
use futures::future::join_all;
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::PagestreamGetPageRequest;
@@ -10,11 +9,10 @@ use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{info, instrument};
use tracing::info;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
@@ -38,8 +36,12 @@ pub(crate) struct Args {
num_clients: NonZeroUsize,
#[clap(long)]
runtime: Option<humantime::Duration>,
/// Each client sends requests at the given rate.
///
/// If a request takes too long and we should be issuing a new request already,
/// we skip that request and account it as `MISSED`.
#[clap(long)]
per_target_rate_limit: Option<usize>,
per_client_rate: Option<usize>,
/// Probability for sending `latest=true` in the request (uniform distribution).
#[clap(long, default_value = "1")]
req_latest_probability: f64,
@@ -61,12 +63,16 @@ pub(crate) struct Args {
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
missed: AtomicU64,
}
impl LiveStats {
fn inc(&self) {
fn request_done(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
fn missed(&self, n: u64) {
self.missed.fetch_add(n, Ordering::Relaxed);
}
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
@@ -220,13 +226,12 @@ async fn main_impl(
let live_stats = Arc::new(LiveStats::default());
let num_client_tasks = args.num_clients.get() * timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let num_work_sender_tasks = args.num_clients.get() * timelines.len();
let num_main_impl = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl,
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
));
tokio::spawn({
@@ -238,10 +243,12 @@ async fn main_impl(
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let missed = stats.missed.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
info!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
"RPS: {:.0} MISSED: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64(),
missed as f64 / elapsed.as_secs_f64()
);
}
}
@@ -249,127 +256,105 @@ async fn main_impl(
let cancel = CancellationToken::new();
let mut work_senders: HashMap<WorkerId, _> = HashMap::new();
let mut tasks = Vec::new();
let rps_period = args
.per_client_rate
.map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
let live_stats = live_stats.clone();
let start_work_barrier = start_work_barrier.clone();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == worker_id.timeline)
.cloned()
.collect();
let weights =
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
.unwrap();
let cancel = cancel.clone();
Box::pin(async move {
let client =
pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
let periods_passed_until_now =
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
.unwrap();
if periods_passed_until_now > ticks_processed {
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
}
ticks_processed = periods_passed_until_now;
}
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;
STATS.with(|stats| {
stats
.borrow()
.lock()
.unwrap()
.observe(end.duration_since(start))
.unwrap();
});
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
})
};
info!("spawning workers");
let mut workers = JoinSet::new();
for timeline in timelines.iter().cloned() {
for num_client in 0..args.num_clients.get() {
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
let worker_id = WorkerId {
timeline,
num_client,
};
work_senders.insert(worker_id, sender);
tasks.push(tokio::spawn(client(
args,
worker_id,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&live_stats),
cancel.clone(),
)));
workers.spawn(make_worker(worker_id));
}
}
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = {
let start_work_barrier = start_work_barrier.clone();
let cancel = cancel.clone();
match args.per_target_rate_limit {
None => Box::pin(async move {
let weights = rand::distributions::weighted::WeightedIndex::new(
all_ranges.iter().map(|v| v.len()),
)
.unwrap();
start_work_barrier.wait().await;
while !cancel.is_cancelled() {
let (timeline, req) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(
WorkerId {
timeline: r.timeline,
num_client: rng.gen_range(0..args.num_clients.get()),
},
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
},
)
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
if sender.send(req).await.is_err() {
assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
}
}
}),
Some(rps_limit) => Box::pin(async move {
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
let make_task: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
&|worker_id| {
let sender = work_senders.get(&worker_id).unwrap();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == worker_id.timeline)
.cloned()
.collect();
let weights = rand::distributions::weighted::WeightedIndex::new(
ranges.iter().map(|v| v.len()),
)
.unwrap();
let cancel = cancel.clone();
Box::pin(async move {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(
/* TODO review this choice */
tokio::time::MissedTickBehavior::Burst,
);
while !cancel.is_cancelled() {
ticker.tick().await;
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
}
};
if sender.send(req).await.is_err() {
assert!(
cancel.is_cancelled(),
"client has gone away unexpectedly"
);
}
}
})
};
let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect();
start_work_barrier.wait().await;
join_all(tasks).await;
}),
let workers = async move {
while let Some(res) = workers.join_next().await {
res.unwrap();
}
};
let work_sender_task = tokio::spawn(work_sender);
info!("waiting for everything to become ready");
start_work_barrier.wait().await;
info!("work started");
@@ -377,20 +362,13 @@ async fn main_impl(
tokio::time::sleep(runtime.into()).await;
info!("runtime over, signalling cancellation");
cancel.cancel();
work_sender_task.await.unwrap();
workers.await;
info!("work sender exited");
} else {
work_sender_task.await.unwrap();
workers.await;
unreachable!("work sender never terminates");
}
info!("joining clients");
for t in tasks {
t.await.unwrap();
}
info!("all clients stopped");
let output = Output {
total: {
let mut agg_stats = request_stats::Stats::new();
@@ -407,49 +385,3 @@ async fn main_impl(
anyhow::Ok(())
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
id: WorkerId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
live_stats: Arc<LiveStats>,
cancel: CancellationToken,
) {
let WorkerId {
timeline,
num_client: _,
} = id;
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
let do_requests = async {
start_work_barrier.wait().await;
while let Some(req) = work.recv().await {
let start = Instant::now();
client
.getpage(req)
.await
.with_context(|| format!("getpage for {timeline}"))
.unwrap();
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
};
tokio::select! {
res = do_requests => { res },
_ = cancel.cancelled() => {
// fallthrough to shutdown
}
}
client.shutdown().await;
}

View File

@@ -2496,6 +2496,56 @@ pub mod tokio_epoll_uring {
}
}
pub(crate) mod tenant_throttling {
use metrics::{register_int_counter_vec, IntCounter};
use once_cell::sync::Lazy;
use crate::tenant::{self, throttle::Metric};
pub(crate) struct TimelineGet {
wait_time: IntCounter,
count: IntCounter,
}
pub(crate) static TIMELINE_GET: Lazy<TimelineGet> = Lazy::new(|| {
static WAIT_USECS: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_wait_usecs_sum_global",
"Sum of microseconds that tenants spent waiting for a tenant throttle of a given kind.",
&["kind"]
)
.unwrap()
});
static WAIT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_throttling_count_global",
"Count of tenant throttlings, by kind of throttle.",
&["kind"]
)
.unwrap()
});
let kind = "timeline_get";
TimelineGet {
wait_time: WAIT_USECS.with_label_values(&[kind]),
count: WAIT_COUNT.with_label_values(&[kind]),
}
});
impl Metric for &'static TimelineGet {
#[inline(always)]
fn observe_throttling(
&self,
tenant::throttle::Observation { wait_time }: &tenant::throttle::Observation,
) {
let val = u64::try_from(wait_time.as_micros()).unwrap();
self.wait_time.inc_by(val);
self.count.inc();
}
}
}
pub fn preinitialize_metrics() {
// Python tests need these and on some we do alerting.
//
@@ -2557,4 +2607,5 @@ pub fn preinitialize_metrics() {
// Custom
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&tenant_throttling::TIMELINE_GET);
}

View File

@@ -188,6 +188,7 @@ task_local! {
serde::Serialize,
serde::Deserialize,
strum_macros::IntoStaticStr,
strum_macros::EnumString,
)]
pub enum TaskKind {
// Pageserver startup, i.e., `main`

View File

@@ -167,6 +167,8 @@ pub(crate) mod timeline;
pub mod size;
pub(crate) mod throttle;
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
@@ -305,6 +307,11 @@ pub struct Tenant {
// Users of the Tenant such as the page service must take this Gate to avoid
// trying to use a Tenant which is shutting down.
pub(crate) gate: Gate,
/// Throttle applied at the top of [`Timeline::get`].
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
}
impl std::fmt::Debug for Tenant {
@@ -990,6 +997,7 @@ impl Tenant {
TimelineResources {
remote_client: Some(remote_client),
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
},
ctx,
)
@@ -2075,7 +2083,7 @@ impl Tenant {
};
// We have a pageserver TenantConf, we need the API-facing TenantConfig.
let tenant_config: models::TenantConfig = conf.tenant_conf.into();
let tenant_config: models::TenantConfig = conf.tenant_conf.clone().into();
models::LocationConfig {
mode: location_config_mode,
@@ -2209,93 +2217,93 @@ where
impl Tenant {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
self.tenant_conf.read().unwrap().tenant_conf.clone()
}
pub fn effective_config(&self) -> TenantConf {
self.tenant_specific_overrides()
.merge(self.conf.default_tenant_conf)
.merge(self.conf.default_tenant_conf.clone())
}
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
pub fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
pub fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.compaction_period
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
}
pub fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.gc_horizon
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
}
pub fn get_gc_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.gc_period
.unwrap_or(self.conf.default_tenant_conf.gc_period)
}
pub fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn get_min_resident_size_override(&self) -> Option<u64> {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.min_resident_size_override
.or(self.conf.default_tenant_conf.min_resident_size_override)
}
pub fn get_heatmap_period(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
let heatmap_period = tenant_conf
.heatmap_period
.unwrap_or(self.conf.default_tenant_conf.heatmap_period);
@@ -2308,6 +2316,7 @@ impl Tenant {
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
self.tenant_conf_updated();
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
@@ -2319,6 +2328,7 @@ impl Tenant {
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
*self.tenant_conf.write().unwrap() = new_conf;
self.tenant_conf_updated();
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
@@ -2328,6 +2338,24 @@ impl Tenant {
}
}
fn get_timeline_get_throttle_config(
psconf: &'static PageServerConf,
overrides: &TenantConfOpt,
) -> throttle::Config {
overrides
.timeline_get_throttle
.clone()
.unwrap_or(psconf.default_tenant_conf.timeline_get_throttle.clone())
}
pub(crate) fn tenant_conf_updated(&self) {
let conf = {
let guard = self.tenant_conf.read().unwrap();
Self::get_timeline_get_throttle_config(self.conf, &guard.tenant_conf)
};
self.timeline_get_throttle.reconfigure(conf)
}
/// Helper function to create a new Timeline struct.
///
/// The returned Timeline is in Loading state. The caller is responsible for
@@ -2454,7 +2482,6 @@ impl Tenant {
// using now here is good enough approximation to catch tenants with really long
// activation times.
constructed_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(attached_conf)),
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
gc_cs: tokio::sync::Mutex::new(()),
@@ -2469,6 +2496,11 @@ impl Tenant {
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
gate: Gate::default(),
timeline_get_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
&crate::metrics::tenant_throttling::TIMELINE_GET,
)),
tenant_conf: Arc::new(RwLock::new(attached_conf)),
}
}
@@ -3224,6 +3256,7 @@ impl Tenant {
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
}
}
@@ -3495,7 +3528,7 @@ impl Tenant {
}
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
self.tenant_conf.read().unwrap().tenant_conf.clone()
}
}
@@ -3654,6 +3687,7 @@ pub(crate) mod harness {
gc_feedback: Some(tenant_conf.gc_feedback),
heatmap_period: Some(tenant_conf.heatmap_period),
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
}
}
}
@@ -3757,7 +3791,7 @@ pub(crate) mod harness {
TenantState::Loading,
self.conf,
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt::from(self.tenant_conf),
TenantConfOpt::from(self.tenant_conf.clone()),
self.generation,
&ShardParameters::default(),
))

View File

@@ -9,8 +9,8 @@
//! may lead to a data loss.
//!
use anyhow::bail;
use pageserver_api::models;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::{self, ThrottleConfig};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
@@ -285,7 +285,7 @@ impl Default for LocationConf {
///
/// For storing and transmitting individual tenant's configuration, see
/// TenantConfOpt.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this
// This puts a backstop on how much WAL needs to be re-digested if the
@@ -348,11 +348,13 @@ pub struct TenantConf {
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
pub lazy_slru_download: bool,
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
}
/// Same as TenantConf, but this struct preserves the information about
/// which parameters are set and which are not.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
@@ -437,6 +439,9 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub lazy_slru_download: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeline_get_throttle: Option<pageserver_api::models::ThrottleConfig>,
}
impl TenantConfOpt {
@@ -485,6 +490,10 @@ impl TenantConfOpt {
lazy_slru_download: self
.lazy_slru_download
.unwrap_or(global_conf.lazy_slru_download),
timeline_get_throttle: self
.timeline_get_throttle
.clone()
.unwrap_or(global_conf.timeline_get_throttle),
}
}
}
@@ -524,6 +533,7 @@ impl Default for TenantConf {
gc_feedback: false,
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
}
}
}
@@ -596,6 +606,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
gc_feedback: value.gc_feedback,
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
}
}
}

View File

@@ -484,7 +484,7 @@ pub async fn init_tenant_mgr(
TenantSlot::Secondary(SecondaryTenant::new(
tenant_shard_id,
location_conf.shard,
location_conf.tenant_conf,
location_conf.tenant_conf.clone(),
&SecondaryLocationConfig { warm: false },
)),
);
@@ -805,7 +805,7 @@ pub(crate) async fn set_new_tenant_config(
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(
new_tenant_conf,
new_tenant_conf.clone(),
tenant.generation,
&ShardParameters::default(),
);
@@ -1466,7 +1466,7 @@ impl TenantManager {
attach_mode: AttachmentMode::Single,
}),
shard: child_shard_identity,
tenant_conf: parent_tenant_conf,
tenant_conf: parent_tenant_conf.clone(),
};
self.upsert_location(

View File

@@ -133,7 +133,7 @@ impl SecondaryTenant {
}
pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
*(self.tenant_conf.lock().unwrap()) = *config;
*(self.tenant_conf.lock().unwrap()) = config.clone();
}
/// For API access: generate a LocationConfig equivalent to the one that would be used to
@@ -144,7 +144,7 @@ impl SecondaryTenant {
let conf = models::LocationConfigSecondary { warm: conf.warm };
let tenant_conf = *self.tenant_conf.lock().unwrap();
let tenant_conf = self.tenant_conf.lock().unwrap().clone();
models::LocationConfig {
mode: models::LocationConfigMode::Secondary,
generation: None,

View File

@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
@@ -139,6 +140,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// How many errors we have seen consequtively
let mut error_run_count = 0;
let mut last_throttle_flag_reset_at = Instant::now();
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
@@ -203,6 +206,27 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
walredo_mgr.maybe_quiesce(period * 10);
}
// TODO: move this (and walredo quiesce) to a separate task that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted, count_throttled, sum_throttled_usecs } = tenant.timeline_get_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
let delta = now - prev;
warn!(
n_seconds=%format_args!("{:.3}",
delta.as_secs_f64()),
count_accounted,
count_throttled,
sum_throttled_usecs,
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds")
});
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await

View File

@@ -0,0 +1,162 @@
use std::{
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::error;
use crate::{context::RequestContext, task_mgr::TaskKind};
/// Throttle for `async` functions.
///
/// Runtime reconfigurable.
///
/// To share a throttle among multiple entities, wrap it in an [`Arc`].
///
/// The intial use case for this is tenant-wide throttling of getpage@lsn requests.
pub struct Throttle<M: Metric> {
inner: ArcSwap<Inner>,
metric: M,
/// will be turned into [`Stats::count_accounted`]
count_accounted: AtomicU64,
/// will be turned into [`Stats::count_throttled`]
count_throttled: AtomicU64,
/// will be turned into [`Stats::sum_throttled_usecs`]
sum_throttled_usecs: AtomicU64,
}
pub struct Inner {
task_kinds: EnumSet<TaskKind>,
rate_limiter: Arc<leaky_bucket::RateLimiter>,
config: Config,
}
pub type Config = pageserver_api::models::ThrottleConfig;
pub struct Observation {
pub wait_time: Duration,
}
pub trait Metric {
fn observe_throttling(&self, observation: &Observation);
}
/// See [`Throttle::reset_stats`].
pub struct Stats {
// Number of requests that were subject to throttling, i.e., requests of the configured [`Config::task_kinds`].
pub count_accounted: u64,
// Subset of the `accounted` requests that were actually throttled.
// Note that the numbers are stored as two independent atomics, so, there might be a slight drift.
pub count_throttled: u64,
// Sum of microseconds that throttled requests spent waiting for throttling.
pub sum_throttled_usecs: u64,
}
impl<M> Throttle<M>
where
M: Metric,
{
pub fn new(config: Config, metric: M) -> Self {
Self {
inner: ArcSwap::new(Arc::new(Self::new_inner(config))),
metric,
count_accounted: AtomicU64::new(0),
count_throttled: AtomicU64::new(0),
sum_throttled_usecs: AtomicU64::new(0),
}
}
fn new_inner(config: Config) -> Inner {
let Config {
task_kinds,
initial,
refill_interval,
refill_amount,
max,
fair,
} = &config;
let task_kinds: EnumSet<TaskKind> = task_kinds
.iter()
.filter_map(|s| match TaskKind::from_str(s) {
Ok(v) => Some(v),
Err(e) => {
// TODO: avoid this failure mode
error!(
"cannot parse task kind, ignoring for rate limiting {}",
utils::error::report_compact_sources(&e)
);
None
}
})
.collect();
Inner {
task_kinds,
rate_limiter: Arc::new(
leaky_bucket::RateLimiter::builder()
.initial(*initial)
.interval(*refill_interval)
.refill(refill_amount.get())
.max(*max)
.fair(*fair)
.build(),
),
config,
}
}
pub fn reconfigure(&self, config: Config) {
self.inner.store(Arc::new(Self::new_inner(config)));
}
/// The [`Throttle`] keeps an internal flag that is true if there was ever any actual throttling.
/// This method allows retrieving & resetting that flag.
/// Useful for periodic reporting.
pub fn reset_stats(&self) -> Stats {
let count_accounted = self.count_accounted.swap(0, Ordering::Relaxed);
let count_throttled = self.count_throttled.swap(0, Ordering::Relaxed);
let sum_throttled_usecs = self.sum_throttled_usecs.swap(0, Ordering::Relaxed);
Stats {
count_accounted,
count_throttled,
sum_throttled_usecs,
}
}
/// See [`Config::steady_rps`].
pub fn steady_rps(&self) -> f64 {
self.inner.load().config.steady_rps()
}
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) {
let inner = self.inner.load_full(); // clones the `Inner` Arc
if !inner.task_kinds.contains(ctx.task_kind()) {
return;
};
let start = std::time::Instant::now();
let mut did_throttle = false;
let acquire = inner.rate_limiter.acquire(key_count);
// turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate
let acquire = tokio::task::unconstrained(acquire);
let mut acquire = std::pin::pin!(acquire);
std::future::poll_fn(|cx| {
use std::future::Future;
let poll = acquire.as_mut().poll(cx);
did_throttle = did_throttle || poll.is_pending();
poll
})
.await;
self.count_accounted.fetch_add(1, Ordering::Relaxed);
if did_throttle {
self.count_throttled.fetch_add(1, Ordering::Relaxed);
let now = Instant::now();
let wait_time = now - start;
self.sum_throttled_usecs
.fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed);
let observation = Observation { wait_time };
self.metric.observe_throttling(&observation);
}
}
}

View File

@@ -164,6 +164,9 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
pub struct TimelineResources {
pub remote_client: Option<RemoteTimelineClient>,
pub deletion_queue_client: DeletionQueueClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
}
pub struct Timeline {
@@ -355,6 +358,11 @@ pub struct Timeline {
///
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
/// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
>,
}
pub struct WalReceiverInfo {
@@ -615,6 +623,8 @@ impl Timeline {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
}
self.timeline_get_throttle.throttle(ctx, 1).await;
// This check is debug-only because of the cost of hashing, and because it's a double-check: we
// already checked the key against the shard_identity when looking up the Timeline from
// page_service.
@@ -714,6 +724,10 @@ impl Timeline {
return Err(GetVectoredError::Oversized(key_count));
}
self.timeline_get_throttle
.throttle(ctx, key_count as usize)
.await;
let _timer = crate::metrics::GET_VECTORED_LATENCY
.for_task_kind(ctx.task_kind())
.map(|t| t.start_timer());
@@ -1335,49 +1349,49 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.lazy_slru_download
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
}
fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.eviction_policy
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
@@ -1393,7 +1407,7 @@ impl Timeline {
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
@@ -1555,6 +1569,8 @@ impl Timeline {
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),
timeline_get_throttle: resources.timeline_get_throttle,
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;

View File

@@ -419,6 +419,7 @@ impl DeleteTimelineFlow {
TimelineResources {
remote_client,
deletion_queue_client,
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
},
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.

View File

@@ -176,6 +176,14 @@ def test_fully_custom_config(positive_env: NeonEnv):
"lazy_slru_download": True,
"max_lsn_wal_lag": 230000,
"min_resident_size_override": 23,
"timeline_get_throttle": {
"task_kinds": ["PageRequestHandler"],
"fair": True,
"initial": 0,
"refill_interval": "1s",
"refill_amount": 1000,
"max": 1000,
},
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
}