getpage bench: mode to rate limit per timeline

This commit is contained in:
Christian Schwarz
2023-12-13 10:18:42 +00:00
parent b6cb717a76
commit b3c811731a

View File

@@ -1,4 +1,5 @@
use anyhow::Context;
use futures::future::join_all;
use pageserver::client::page_service::RelTagBlockNo;
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
use pageserver::repository;
@@ -13,7 +14,9 @@ use utils::logging;
use std::cell::RefCell;
use std::collections::HashMap;
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -31,6 +34,8 @@ pub(crate) struct Args {
num_clients: NonZeroUsize,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long)]
per_target_rate_limit: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -165,6 +170,7 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
rt.block_on(main_task).unwrap()
}
#[derive(Clone)]
struct KeyRange {
timeline: TenantTimelineId,
timeline_lsn: Lsn,
@@ -267,9 +273,6 @@ async fn main_impl(
while let Some(res) = js.join_next().await {
all_ranges.extend(res.unwrap().unwrap());
}
let weights =
rand::distributions::weighted::WeightedIndex::new(all_ranges.iter().map(|v| v.len()))
.unwrap();
let live_stats = Arc::new(LiveStats::default());
@@ -315,22 +318,77 @@ async fn main_impl(
)));
}
let work_sender = async move {
start_work_barrier.wait().await;
loop {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
let sender = work_senders.get(&range.timeline).unwrap();
// TODO: what if this blocks?
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = match args.per_target_rate_limit {
None => Box::pin(async move {
let weights = rand::distributions::weighted::WeightedIndex::new(
all_ranges.iter().map(|v| v.len()),
)
.unwrap();
start_work_barrier.wait().await;
loop {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
let sender = work_senders.get(&range.timeline).unwrap();
// TODO: what if this blocks?
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
}),
Some(rps_limit) => Box::pin(async move {
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
let make_timeline_task: &dyn Fn(TenantTimelineId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
&|timeline| {
let sender = work_senders.get(&timeline).unwrap();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == timeline)
.cloned()
.collect();
let weights = rand::distributions::weighted::WeightedIndex::new(
ranges.iter().map(|v| v.len()),
)
.unwrap();
Box::pin(async move {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(
/* TODO review this choice */
tokio::time::MissedTickBehavior::Burst,
);
loop {
ticker.tick().await;
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
})
};
let tasks: Vec<_> = work_senders
.keys()
.map(|tl| make_timeline_task(**tl))
.collect();
start_work_barrier.wait().await;
join_all(tasks).await;
}),
};
if let Some(runtime) = args.runtime {