diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 85caa565fe..bb235492d1 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -67,36 +67,47 @@ fn add_multithreaded_walredo_requesters( input_factory: fn() -> Request, request_repeats: usize, ) { - b.iter_batched_ref( + let (work_tx, work_rx) = std::sync::mpsc::sync_channel(10); + + let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx)); + + let barrier = Arc::new(Barrier::new(threads as usize + 1)); + + let jhs = (0..threads) + .map(|_| { + std::thread::spawn({ + let manager = manager.clone(); + let barrier = barrier.clone(); + let work_rx = work_rx.clone(); + move || loop { + // queue up and wait if we want to go another round + if work_rx.lock().unwrap().recv().is_err() { + break; + } + + let input = std::iter::repeat(input_factory()) + .take(request_repeats) + .collect::>(); + + barrier.wait(); + + execute_all(input, &*manager).unwrap(); + + barrier.wait(); + } + }) + }) + .collect::>(); + + let _jhs = JoinOnDrop(jhs); + + b.iter_batched( || { - // barrier for all of the threads, and the benchmarked thread - let barrier = Arc::new(Barrier::new(threads as usize + 1)); - - let jhs = (0..threads) - .map(|_| { - std::thread::spawn({ - let manager = manager.clone(); - let barrier = barrier.clone(); - move || { - let input = std::iter::repeat(input_factory()) - .take(request_repeats) - .collect::>(); - - barrier.wait(); - - execute_all(input, &*manager).unwrap(); - - barrier.wait(); - } - }) - }) - .collect::>(); - - (barrier, JoinOnDrop(jhs)) + for _ in 0..threads { + work_tx.send(()).unwrap() + } }, - |input| { - let barrier = &input.0; - + |()| { // start the work barrier.wait(); @@ -105,6 +116,8 @@ fn add_multithreaded_walredo_requesters( }, criterion::BatchSize::PerIteration, ); + + drop(work_tx); } struct JoinOnDrop(Vec>);