diff --git a/Cargo.lock b/Cargo.lock index 3c7fcc0f67..0d3b0d947f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3224,6 +3224,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "tracing", "utils", "workspace_hack", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 257cc798e8..70fafee629 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -19,6 +19,7 @@ serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tokio-util.workspace = true pageserver = { path = ".." } pageserver_client.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 7ed9ae53ce..38d5bf4e61 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -7,6 +7,7 @@ use pageserver_api::key::is_rel_block_key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::models::PagestreamGetPageRequest; +use tokio_util::sync::CancellationToken; use utils::id::TenantTimelineId; use utils::lsn::Lsn; @@ -210,11 +211,11 @@ async fn main_impl( let num_client_tasks = timelines.len(); let num_live_stats_dump = 1; let num_work_sender_tasks = 1; + let num_main_impl = 1; let start_work_barrier = Arc::new(tokio::sync::Barrier::new( - num_client_tasks + num_live_stats_dump + num_work_sender_tasks, + num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl, )); - let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); tokio::spawn({ let stats = Arc::clone(&live_stats); @@ -234,126 +235,143 @@ async fn main_impl( } }); - let mut work_senders = HashMap::new(); + let cancel = CancellationToken::new(); + + let mut work_senders: HashMap = HashMap::new(); let mut tasks = Vec::new(); for tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are - work_senders.insert(tl, sender); + work_senders.insert(*tl, sender); tasks.push(tokio::spawn(client( args, *tl, Arc::clone(&start_work_barrier), receiver, - Arc::clone(&all_work_done_barrier), Arc::clone(&live_stats), + cancel.clone(), ))); } - let work_sender: Pin>> = match args.per_target_rate_limit { - None => Box::pin(async move { - let weights = rand::distributions::weighted::WeightedIndex::new( - all_ranges.iter().map(|v| v.len()), - ) - .unwrap(); - - start_work_barrier.wait().await; - - loop { - let (timeline, req) = { - let mut rng = rand::thread_rng(); - let r = &all_ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - let (rel_tag, block_no) = - key_to_rel_block(key).expect("we filter non-rel-block keys out above"); - ( - r.timeline, - PagestreamGetPageRequest { - latest: rng.gen_bool(args.req_latest_probability), - lsn: r.timeline_lsn, - rel: rel_tag, - blkno: block_no, - }, - ) - }; - let sender = work_senders.get(&timeline).unwrap(); - // TODO: what if this blocks? - sender.send(req).await.ok().unwrap(); - } - }), - Some(rps_limit) => Box::pin(async move { - let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - - let make_timeline_task: &dyn Fn( - TenantTimelineId, - ) - -> Pin>> = &|timeline| { - let sender = work_senders.get(&timeline).unwrap(); - let ranges: Vec = all_ranges - .iter() - .filter(|r| r.timeline == timeline) - .cloned() - .collect(); + let work_sender: Pin>> = { + let start_work_barrier = start_work_barrier.clone(); + let cancel = cancel.clone(); + match args.per_target_rate_limit { + None => Box::pin(async move { let weights = rand::distributions::weighted::WeightedIndex::new( - ranges.iter().map(|v| v.len()), + all_ranges.iter().map(|v| v.len()), ) .unwrap(); - Box::pin(async move { - let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior( - /* TODO review this choice */ - tokio::time::MissedTickBehavior::Burst, - ); - loop { - ticker.tick().await; - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - assert!(is_rel_block_key(&key)); - let (rel_tag, block_no) = key_to_rel_block(key) - .expect("we filter non-rel-block keys out above"); + start_work_barrier.wait().await; + + while !cancel.is_cancelled() { + let (timeline, req) = { + let mut rng = rand::thread_rng(); + let r = &all_ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + ( + r.timeline, PagestreamGetPageRequest { latest: rng.gen_bool(args.req_latest_probability), lsn: r.timeline_lsn, rel: rel_tag, blkno: block_no, - } - }; - sender.send(req).await.ok().unwrap(); + }, + ) + }; + let sender = work_senders.get(&timeline).unwrap(); + // TODO: what if this blocks? + if sender.send(req).await.is_err() { + assert!(cancel.is_cancelled(), "client has gone away unexpectedly"); } - }) - }; + } + }), + Some(rps_limit) => Box::pin(async move { + let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); + let make_timeline_task: &dyn Fn( + TenantTimelineId, + ) + -> Pin>> = &|timeline| { + let sender = work_senders.get(&timeline).unwrap(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == timeline) + .cloned() + .collect(); + let weights = rand::distributions::weighted::WeightedIndex::new( + ranges.iter().map(|v| v.len()), + ) + .unwrap(); - let tasks: Vec<_> = work_senders - .keys() - .map(|tl| make_timeline_task(**tl)) - .collect(); + let cancel = cancel.clone(); + Box::pin(async move { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior( + /* TODO review this choice */ + tokio::time::MissedTickBehavior::Burst, + ); + while !cancel.is_cancelled() { + ticker.tick().await; + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + assert!(is_rel_block_key(&key)); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + latest: rng.gen_bool(args.req_latest_probability), + lsn: r.timeline_lsn, + rel: rel_tag, + blkno: block_no, + } + }; + if sender.send(req).await.is_err() { + assert!(cancel.is_cancelled(), "client has gone away unexpectedly"); + } + } + }) + }; - start_work_barrier.wait().await; + let tasks: Vec<_> = work_senders + .keys() + .map(|tl| make_timeline_task(*tl)) + .collect(); - join_all(tasks).await; - }), + start_work_barrier.wait().await; + + join_all(tasks).await; + }), + } }; + let work_sender_task = tokio::spawn(work_sender); + if let Some(runtime) = args.runtime { - match tokio::time::timeout(runtime.into(), work_sender).await { - Ok(()) => unreachable!("work sender never terminates"), - Err(_timeout) => { - // this implicitly drops the work_senders, making all the clients exit - } - } + info!("waiting for everything to become ready"); + start_work_barrier.wait().await; + info!("work started"); + tokio::time::sleep(runtime.into()).await; + info!("runtime over, signalling cancellation"); + cancel.cancel(); + work_sender_task.await.unwrap(); + info!("work sender exited"); } else { - work_sender.await; + work_sender_task.await.unwrap(); unreachable!("work sender never terminates"); } + info!("joining clients"); for t in tasks { t.await.unwrap(); } + info!("all clients stopped"); + let output = Output { total: { let mut agg_stats = request_stats::Stats::new(); @@ -377,11 +395,9 @@ async fn client( timeline: TenantTimelineId, start_work_barrier: Arc, mut work: tokio::sync::mpsc::Receiver, - all_work_done_barrier: Arc, live_stats: Arc, + cancel: CancellationToken, ) { - start_work_barrier.wait().await; - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) .await .unwrap(); @@ -390,12 +406,18 @@ async fn client( .await .unwrap(); - while let Some(req) = work.recv().await { + start_work_barrier.wait().await; + + while let Some(req) = + tokio::select! { work = work.recv() => { work } , _ = cancel.cancelled() => { return; } } + { let start = Instant::now(); - client - .getpage(req) - .await - .with_context(|| format!("getpage for {timeline}")) + + let res = tokio::select! { + res = client.getpage(req) => { res }, + _ = cancel.cancelled() => { return; } + }; + res.with_context(|| format!("getpage for {timeline}")) .unwrap(); let elapsed = start.elapsed(); live_stats.inc(); @@ -403,6 +425,4 @@ async fn client( stats.borrow().lock().unwrap().observe(elapsed).unwrap(); }); } - - all_work_done_barrier.wait().await; }