diff --git a/Cargo.lock b/Cargo.lock index 4f96da30f9..22013b4fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3210,6 +3210,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "tracing", "utils", "workspace_hack", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 257cc798e8..70fafee629 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -19,6 +19,7 @@ serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tokio-util.workspace = true pageserver = { path = ".." } pageserver_client.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 7ed9ae53ce..e8e8e1d72d 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -7,6 +7,7 @@ use pageserver_api::key::is_rel_block_key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::models::PagestreamGetPageRequest; +use tokio_util::sync::CancellationToken; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -210,9 +211,10 @@ async fn main_impl( let num_client_tasks = timelines.len(); let num_live_stats_dump = 1; let num_work_sender_tasks = 1; + let num_main_impl = 1; let start_work_barrier = Arc::new(tokio::sync::Barrier::new( - num_client_tasks + num_live_stats_dump + num_work_sender_tasks, + num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl, )); let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); @@ -236,6 +238,7 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); + let cancel_clients = CancellationToken::new(); for tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); @@ -246,103 +249,113 @@ async fn main_impl( receiver, Arc::clone(&all_work_done_barrier), Arc::clone(&live_stats), + cancel_clients.child_token(), ))); } - let work_sender: Pin>> = match args.per_target_rate_limit { - None => Box::pin(async move { - let weights = rand::distributions::weighted::WeightedIndex::new( - all_ranges.iter().map(|v| v.len()), - ) - .unwrap(); - - start_work_barrier.wait().await; - - loop { - let (timeline, req) = { - let mut rng = rand::thread_rng(); - let r = &all_ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - let (rel_tag, block_no) = - key_to_rel_block(key).expect("we filter non-rel-block keys out above"); - ( - r.timeline, - PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), - lsn: r.timeline_lsn, - rel: rel_tag, - blkno: block_no, - }, - ) - }; - let sender = work_senders.get(&timeline).unwrap(); - // TODO: what if this blocks? - sender.send(req).await.ok().unwrap(); - } - }), - Some(rps_limit) => Box::pin(async move { - let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - - let make_timeline_task: &dyn Fn( - TenantTimelineId, - ) - -> Pin>> = &|timeline| { - let sender = work_senders.get(&timeline).unwrap(); - let ranges: Vec = all_ranges - .iter() - .filter(|r| r.timeline == timeline) - .cloned() - .collect(); + let work_sender: Pin>> = { + let start_work_barrier = start_work_barrier.clone(); + match args.per_target_rate_limit { + None => Box::pin(async move { let weights = rand::distributions::weighted::WeightedIndex::new( - ranges.iter().map(|v| v.len()), + all_ranges.iter().map(|v| v.len()), ) .unwrap(); - Box::pin(async move { - let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior( - /* TODO review this choice */ - tokio::time::MissedTickBehavior::Burst, - ); - loop { - ticker.tick().await; - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - assert!(is_rel_block_key(&key)); - let (rel_tag, block_no) = key_to_rel_block(key) - .expect("we filter non-rel-block keys out above"); + start_work_barrier.wait().await; + + loop { + let (timeline, req) = { + let mut rng = rand::thread_rng(); + let r = &all_ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + ( + r.timeline, PagestreamGetPageRequest { latest: rng.gen_bool(args.req_latest_probability), lsn: r.timeline_lsn, rel: rel_tag, blkno: block_no, - } - }; - sender.send(req).await.ok().unwrap(); - } - }) - }; + }, + ) + }; + let sender = work_senders.get(&timeline).unwrap(); + // TODO: what if this blocks? + sender.send(req).await.ok().unwrap(); + } + }), + Some(rps_limit) => Box::pin(async move { + let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - let tasks: Vec<_> = work_senders - .keys() - .map(|tl| make_timeline_task(**tl)) - .collect(); + let make_timeline_task: &dyn Fn( + TenantTimelineId, + ) + -> Pin>> = &|timeline| { + let sender = work_senders.get(&timeline).unwrap(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == timeline) + .cloned() + .collect(); + let weights = rand::distributions::weighted::WeightedIndex::new( + ranges.iter().map(|v| v.len()), + ) + .unwrap(); - start_work_barrier.wait().await; + Box::pin(async move { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior( + /* TODO review this choice */ + tokio::time::MissedTickBehavior::Burst, + ); + loop { + ticker.tick().await; + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + assert!(is_rel_block_key(&key)); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + latest: rng.gen_bool(args.req_latest_probability), + lsn: r.timeline_lsn, + rel: rel_tag, + blkno: block_no, + } + }; + sender.send(req).await.ok().unwrap(); + } + }) + }; - join_all(tasks).await; - }), + let tasks: Vec<_> = work_senders + .keys() + .map(|tl| make_timeline_task(**tl)) + .collect(); + + start_work_barrier.wait().await; + + join_all(tasks).await; + }), + } }; + info!("waiting for everything to become ready"); + start_work_barrier.wait().await; + info!("workload is starting"); if let Some(runtime) = args.runtime { match tokio::time::timeout(runtime.into(), work_sender).await { Ok(()) => unreachable!("work sender never terminates"), Err(_timeout) => { - // this implicitly drops the work_senders, making all the clients exit + info!("runtime over, cancelling clients"); + // this implicitly drops the work_senders, making all the clients exit eventually + // however, work_senders can have a queue, so, also add a cancellation token + cancel_clients.cancel(); } } } else { @@ -354,6 +367,8 @@ async fn main_impl( t.await.unwrap(); } + info!("all clients stopped"); + let output = Output { total: { let mut agg_stats = request_stats::Stats::new(); @@ -379,6 +394,7 @@ async fn client( mut work: tokio::sync::mpsc::Receiver, all_work_done_barrier: Arc, live_stats: Arc, + cancel: CancellationToken, ) { start_work_barrier.wait().await; @@ -390,12 +406,16 @@ async fn client( .await .unwrap(); - while let Some(req) = work.recv().await { + while let Some(req) = + tokio::select! { work = work.recv() => { work } , _ = cancel.cancelled() => { return; } } + { let start = Instant::now(); - client - .getpage(req) - .await - .with_context(|| format!("getpage for {timeline}")) + + let res = tokio::select! { + res = client.getpage(req) => { res }, + _ = cancel.cancelled() => { return; } + }; + res.with_context(|| format!("getpage for {timeline}")) .unwrap(); let elapsed = start.elapsed(); live_stats.inc();