diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index bf302c75e8..cb5fbb7abd 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use futures::future::join_all; use pageserver::client::page_service::RelTagBlockNo; use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block}; use pageserver::repository; @@ -13,7 +14,9 @@ use utils::logging; use std::cell::RefCell; use std::collections::HashMap; +use std::future::Future; use std::num::NonZeroUsize; +use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -31,6 +34,8 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, + #[clap(long)] + per_target_rate_limit: Option, targets: Option>, } @@ -165,6 +170,7 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { rt.block_on(main_task).unwrap() } +#[derive(Clone)] struct KeyRange { timeline: TenantTimelineId, timeline_lsn: Lsn, @@ -267,9 +273,6 @@ async fn main_impl( while let Some(res) = js.join_next().await { all_ranges.extend(res.unwrap().unwrap()); } - let weights = - rand::distributions::weighted::WeightedIndex::new(all_ranges.iter().map(|v| v.len())) - .unwrap(); let live_stats = Arc::new(LiveStats::default()); @@ -315,22 +318,77 @@ async fn main_impl( ))); } - let work_sender = async move { - start_work_barrier.wait().await; - loop { - let (range, key) = { - let mut rng = rand::thread_rng(); - let r = &all_ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - let (rel_tag, block_no) = - key_to_rel_block(key).expect("we filter non-rel-block keys out above"); - (r, RelTagBlockNo { rel_tag, block_no }) - }; - let sender = work_senders.get(&range.timeline).unwrap(); - // TODO: what if this blocks? - sender.send((key, range.timeline_lsn)).await.ok().unwrap(); - } + let work_sender: Pin>> = match args.per_target_rate_limit { + None => Box::pin(async move { + let weights = rand::distributions::weighted::WeightedIndex::new( + all_ranges.iter().map(|v| v.len()), + ) + .unwrap(); + + start_work_barrier.wait().await; + + loop { + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = &all_ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; + let sender = work_senders.get(&range.timeline).unwrap(); + // TODO: what if this blocks? + sender.send((key, range.timeline_lsn)).await.ok().unwrap(); + } + }), + Some(rps_limit) => Box::pin(async move { + let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); + + let make_timeline_task: &dyn Fn(TenantTimelineId) -> Pin>> = + &|timeline| { + let sender = work_senders.get(&timeline).unwrap(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == timeline) + .cloned() + .collect(); + let weights = rand::distributions::weighted::WeightedIndex::new( + ranges.iter().map(|v| v.len()), + ) + .unwrap(); + + Box::pin(async move { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior( + /* TODO review this choice */ + tokio::time::MissedTickBehavior::Burst, + ); + loop { + ticker.tick().await; + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; + sender.send((key, range.timeline_lsn)).await.ok().unwrap(); + } + }) + }; + + let tasks: Vec<_> = work_senders + .keys() + .map(|tl| make_timeline_task(**tl)) + .collect(); + + start_work_barrier.wait().await; + + join_all(tasks).await; + }), }; if let Some(runtime) = args.runtime {