diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index e8e8e1d72d..38d5bf4e61 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -216,7 +216,6 @@ async fn main_impl( let start_work_barrier = Arc::new(tokio::sync::Barrier::new( num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl, )); - let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); tokio::spawn({ let stats = Arc::clone(&live_stats); @@ -236,25 +235,26 @@ async fn main_impl( } }); - let mut work_senders = HashMap::new(); + let cancel = CancellationToken::new(); + + let mut work_senders: HashMap = HashMap::new(); let mut tasks = Vec::new(); - let cancel_clients = CancellationToken::new(); for tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are - work_senders.insert(tl, sender); + work_senders.insert(*tl, sender); tasks.push(tokio::spawn(client( args, *tl, Arc::clone(&start_work_barrier), receiver, - Arc::clone(&all_work_done_barrier), Arc::clone(&live_stats), - cancel_clients.child_token(), + cancel.clone(), ))); } let work_sender: Pin>> = { let start_work_barrier = start_work_barrier.clone(); + let cancel = cancel.clone(); match args.per_target_rate_limit { None => Box::pin(async move { let weights = rand::distributions::weighted::WeightedIndex::new( @@ -264,7 +264,7 @@ async fn main_impl( start_work_barrier.wait().await; - loop { + while !cancel.is_cancelled() { let (timeline, req) = { let mut rng = rand::thread_rng(); let r = &all_ranges[weights.sample(&mut rng)]; @@ -284,12 +284,13 @@ async fn main_impl( }; let sender = work_senders.get(&timeline).unwrap(); // TODO: what if this blocks? - sender.send(req).await.ok().unwrap(); + if sender.send(req).await.is_err() { + assert!(cancel.is_cancelled(), "client has gone away unexpectedly"); + } } }), Some(rps_limit) => Box::pin(async move { let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - let make_timeline_task: &dyn Fn( TenantTimelineId, ) @@ -305,13 +306,14 @@ async fn main_impl( ) .unwrap(); + let cancel = cancel.clone(); Box::pin(async move { let mut ticker = tokio::time::interval(period); ticker.set_missed_tick_behavior( /* TODO review this choice */ tokio::time::MissedTickBehavior::Burst, ); - loop { + while !cancel.is_cancelled() { ticker.tick().await; let req = { let mut rng = rand::thread_rng(); @@ -328,14 +330,16 @@ async fn main_impl( blkno: block_no, } }; - sender.send(req).await.ok().unwrap(); + if sender.send(req).await.is_err() { + assert!(cancel.is_cancelled(), "client has gone away unexpectedly"); + } } }) }; let tasks: Vec<_> = work_senders .keys() - .map(|tl| make_timeline_task(**tl)) + .map(|tl| make_timeline_task(*tl)) .collect(); start_work_barrier.wait().await; @@ -345,24 +349,23 @@ async fn main_impl( } }; - info!("waiting for everything to become ready"); - start_work_barrier.wait().await; - info!("workload is starting"); + let work_sender_task = tokio::spawn(work_sender); + if let Some(runtime) = args.runtime { - match tokio::time::timeout(runtime.into(), work_sender).await { - Ok(()) => unreachable!("work sender never terminates"), - Err(_timeout) => { - info!("runtime over, cancelling clients"); - // this implicitly drops the work_senders, making all the clients exit eventually - // however, work_senders can have a queue, so, also add a cancellation token - cancel_clients.cancel(); - } - } + info!("waiting for everything to become ready"); + start_work_barrier.wait().await; + info!("work started"); + tokio::time::sleep(runtime.into()).await; + info!("runtime over, signalling cancellation"); + cancel.cancel(); + work_sender_task.await.unwrap(); + info!("work sender exited"); } else { - work_sender.await; + work_sender_task.await.unwrap(); unreachable!("work sender never terminates"); } + info!("joining clients"); for t in tasks { t.await.unwrap(); } @@ -392,12 +395,9 @@ async fn client( timeline: TenantTimelineId, start_work_barrier: Arc, mut work: tokio::sync::mpsc::Receiver, - all_work_done_barrier: Arc, live_stats: Arc, cancel: CancellationToken, ) { - start_work_barrier.wait().await; - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) .await .unwrap(); @@ -406,6 +406,8 @@ async fn client( .await .unwrap(); + start_work_barrier.wait().await; + while let Some(req) = tokio::select! { work = work.recv() => { work } , _ = cancel.cancelled() => { return; } } { @@ -423,6 +425,4 @@ async fn client( stats.borrow().lock().unwrap().observe(elapsed).unwrap(); }); } - - all_work_done_barrier.wait().await; }