diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index bc577603a6..10bc8b6822 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -7,13 +7,14 @@ //! logging what happens when a sequential scan is requested on a small table, then picking out two //! suitable from logs. -use std::sync::{Arc, Barrier}; +use std::sync::Arc; use bytes::{Buf, Bytes}; use pageserver::{ config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager, }; use pageserver_api::shard::TenantShardId; +use tokio_util::sync::CancellationToken; use utils::{id::TenantId, lsn::Lsn}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; @@ -43,7 +44,7 @@ fn redo_scenarios(c: &mut Criterion) { tracing::info!("first executed"); } - let thread_counts = [1, 2, 4, 8, 16]; + let thread_counts = [1, 2, 4, 8, 16, 32, 64, 128]; let mut group = c.benchmark_group("short"); group.sampling_mode(criterion::SamplingMode::Flat); @@ -74,79 +75,74 @@ fn redo_scenarios(c: &mut Criterion) { drop(group); } -/// Sets up `threads` number of requesters to `request_redo`, with the given input. +/// Sets up a multi-threaded tokio runtime with default worker thread count, +/// then, spawn `requesters` tasks that repeatedly: +/// - get input from `input_factor()` +/// - call `manager.request_redo()` with their input +/// +/// This stress-tests the scalability of a single walredo manager at high tokio-level concurrency. +/// +/// Using tokio's default worker thread count means the results will differ on machines +/// with different core countrs. We don't care about that, the performance will always +/// be different on different hardware. To compare performance of different software versions, +/// use the same hardware. fn add_multithreaded_walredo_requesters( b: &mut criterion::Bencher, - threads: usize, + requesters: usize, manager: &Arc, input_factory: fn() -> Request, ) { - assert_ne!(threads, 0); + assert_ne!(requesters, 0); let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(threads) .enable_all() .build() .unwrap(); - if threads == 1 { - b.iter_batched_ref( - || Some(input_factory()), - |input| rt.block_on(execute_all(input.take(), manager)), - criterion::BatchSize::PerIteration, - ); - } else { - let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize); + let cancel = CancellationToken::new(); - let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx)); + let barrier = Arc::new(tokio::sync::Barrier::new(requesters + 1)); - let barrier = Arc::new(Barrier::new(threads as usize + 1)); - - let jhs = (0..threads) - .map(|_| { - let manager = manager.clone(); - let barrier = barrier.clone(); - let work_rx = work_rx.clone(); - rt.spawn(async move { + let jhs = (0..requesters) + .map(|_| { + let manager = manager.clone(); + let barrier = barrier.clone(); + let cancel = cancel.clone(); + rt.spawn(async move { + let work_loop = async move { loop { - // queue up and wait if we want to go another round - if work_rx.lock().unwrap().recv().is_err() { - break; - } - - let input = Some(input_factory()); - - barrier.wait(); - - execute_all(input, &manager).await.unwrap(); - - barrier.wait(); + let input = input_factory(); + barrier.wait().await; + let page = input.execute(&manager).await.unwrap(); + assert_eq!(page.remaining(), 8192); + barrier.wait().await; } - }) - }) - .collect::>(); - - b.iter_batched( - || { - for _ in 0..threads { - work_tx.send(()).unwrap() + }; + tokio::select! { + _ = work_loop => {}, + _ = cancel.cancelled() => { } } - }, - |()| { - // start the work - barrier.wait(); + }) + }) + .collect::>(); + b.iter_batched( + || (), + |()| { + // start the work + rt.block_on(async { + barrier.wait().await; // wait for work to complete - barrier.wait(); - }, - criterion::BatchSize::PerIteration, - ); + barrier.wait().await; + }); + }, + criterion::BatchSize::PerIteration, + ); - drop(work_tx); + cancel.cancel(); - let jhs = JoinOnDrop(jhs); - rt.block_on(jhs.join_all()); - } + let jhs = JoinOnDrop(jhs); + rt.block_on(jhs.join_all()); } struct JoinOnDrop(Vec>); @@ -158,22 +154,6 @@ impl JoinOnDrop { } } -async fn execute_all(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()> -where - I: IntoIterator, -{ - // just fire all requests as fast as possible - let futs = input.into_iter().map(|req| async move { - let page = req.execute(manager).await?; - assert_eq!(page.remaining(), 8192); - anyhow::Ok(()) - }); - - futures::future::try_join_all(futs).await?; - - Ok(()) -} - criterion_group!(benches, redo_scenarios); criterion_main!(benches);