From e196d974cc585341ee38f8fd6b54c257a3ad78a4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 6 Feb 2024 10:34:16 +0100 Subject: [PATCH] pagebench: actually implement `--num_clients` (#6640) Will need this to validate per-tenant throttling in https://github.com/neondatabase/neon/issues/5899 --- .../pagebench/src/cmd/getpage_latest_lsn.rs | 139 ++++++++++-------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 400b5476b7..aa809d8d26 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -79,6 +79,12 @@ impl KeyRange { } } +#[derive(PartialEq, Eq, Hash, Copy, Clone)] +struct WorkerId { + timeline: TenantTimelineId, + num_client: usize, // from 0..args.num_clients +} + #[derive(serde::Serialize)] struct Output { total: request_stats::Output, @@ -206,7 +212,7 @@ async fn main_impl( let live_stats = Arc::new(LiveStats::default()); - let num_client_tasks = timelines.len(); + let num_client_tasks = args.num_clients.get() * timelines.len(); let num_live_stats_dump = 1; let num_work_sender_tasks = 1; let num_main_impl = 1; @@ -235,19 +241,25 @@ async fn main_impl( let cancel = CancellationToken::new(); - let mut work_senders: HashMap = HashMap::new(); + let mut work_senders: HashMap = HashMap::new(); let mut tasks = Vec::new(); - for tl in &timelines { - let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are - work_senders.insert(*tl, sender); - tasks.push(tokio::spawn(client( - args, - *tl, - Arc::clone(&start_work_barrier), - receiver, - Arc::clone(&live_stats), - cancel.clone(), - ))); + for timeline in timelines.iter().cloned() { + for num_client in 0..args.num_clients.get() { + let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are + let worker_id = WorkerId { + timeline, + num_client, + }; + work_senders.insert(worker_id, sender); + tasks.push(tokio::spawn(client( + args, + worker_id, + Arc::clone(&start_work_barrier), + receiver, + Arc::clone(&live_stats), + cancel.clone(), + ))); + } } let work_sender: Pin>> = { @@ -271,7 +283,10 @@ async fn main_impl( let (rel_tag, block_no) = key_to_rel_block(key).expect("we filter non-rel-block keys out above"); ( - r.timeline, + WorkerId { + timeline: r.timeline, + num_client: rng.gen_range(0..args.num_clients.get()), + }, PagestreamGetPageRequest { latest: rng.gen_bool(args.req_latest_probability), lsn: r.timeline_lsn, @@ -289,56 +304,54 @@ async fn main_impl( }), Some(rps_limit) => Box::pin(async move { let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - let make_timeline_task: &dyn Fn( - TenantTimelineId, - ) - -> Pin>> = &|timeline| { - let sender = work_senders.get(&timeline).unwrap(); - let ranges: Vec = all_ranges - .iter() - .filter(|r| r.timeline == timeline) - .cloned() - .collect(); - let weights = rand::distributions::weighted::WeightedIndex::new( - ranges.iter().map(|v| v.len()), - ) - .unwrap(); + let make_task: &dyn Fn(WorkerId) -> Pin>> = + &|worker_id| { + let sender = work_senders.get(&worker_id).unwrap(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == worker_id.timeline) + .cloned() + .collect(); + let weights = rand::distributions::weighted::WeightedIndex::new( + ranges.iter().map(|v| v.len()), + ) + .unwrap(); - let cancel = cancel.clone(); - Box::pin(async move { - let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior( - /* TODO review this choice */ - tokio::time::MissedTickBehavior::Burst, - ); - while !cancel.is_cancelled() { - ticker.tick().await; - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(is_rel_block_key(&key)); - let (rel_tag, block_no) = key_to_rel_block(key) - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), - lsn: r.timeline_lsn, - rel: rel_tag, - blkno: block_no, + let cancel = cancel.clone(); + Box::pin(async move { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior( + /* TODO review this choice */ + tokio::time::MissedTickBehavior::Burst, + ); + while !cancel.is_cancelled() { + ticker.tick().await; + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(is_rel_block_key(&key)); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + latest: rng.gen_bool(args.req_latest_probability), + lsn: r.timeline_lsn, + rel: rel_tag, + blkno: block_no, + } + }; + if sender.send(req).await.is_err() { + assert!( + cancel.is_cancelled(), + "client has gone away unexpectedly" + ); } - }; - if sender.send(req).await.is_err() { - assert!(cancel.is_cancelled(), "client has gone away unexpectedly"); } - } - }) - }; + }) + }; - let tasks: Vec<_> = work_senders - .keys() - .map(|tl| make_timeline_task(*tl)) - .collect(); + let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect(); start_work_barrier.wait().await; @@ -390,12 +403,16 @@ async fn main_impl( #[instrument(skip_all)] async fn client( args: &'static Args, - timeline: TenantTimelineId, + id: WorkerId, start_work_barrier: Arc, mut work: tokio::sync::mpsc::Receiver, live_stats: Arc, cancel: CancellationToken, ) { + let WorkerId { + timeline, + num_client: _, + } = id; let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) .await .unwrap();