From 36e11009494609e6c48846013957a0ad2248561d Mon Sep 17 00:00:00 2001 From: Calin Anca <49310764+calinanca99@users.noreply.github.com> Date: Fri, 16 Feb 2024 16:31:54 +0100 Subject: [PATCH] bench_walredo: use tokio multi-threaded runtime (#6743) fixes https://github.com/neondatabase/neon/issues/6648 Co-authored-by: Christian Schwarz --- pageserver/benches/bench_walredo.rs | 177 +++++++++++----------------- 1 file changed, 72 insertions(+), 105 deletions(-) diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 4837626086..47c8bd75c6 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -6,14 +6,28 @@ //! There are two sets of inputs; `short` and `medium`. They were collected on postgres v14 by //! logging what happens when a sequential scan is requested on a small table, then picking out two //! suitable from logs. +//! +//! +//! Reference data (git blame to see commit) on an i3en.3xlarge +// ```text +//! short/short/1 time: [39.175 µs 39.348 µs 39.536 µs] +//! short/short/2 time: [51.227 µs 51.487 µs 51.755 µs] +//! short/short/4 time: [76.048 µs 76.362 µs 76.674 µs] +//! short/short/8 time: [128.94 µs 129.82 µs 130.74 µs] +//! short/short/16 time: [227.84 µs 229.00 µs 230.28 µs] +//! short/short/32 time: [455.97 µs 457.81 µs 459.90 µs] +//! short/short/64 time: [902.46 µs 904.84 µs 907.32 µs] +//! short/short/128 time: [1.7416 ms 1.7487 ms 1.7561 ms] +//! `` -use std::sync::{Arc, Barrier}; +use std::sync::Arc; use bytes::{Buf, Bytes}; use pageserver::{ config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager, }; use pageserver_api::shard::TenantShardId; +use tokio::task::JoinSet; use utils::{id::TenantId, lsn::Lsn}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; @@ -39,11 +53,11 @@ fn redo_scenarios(c: &mut Criterion) { .build() .unwrap(); tracing::info!("executing first"); - short().execute(rt.handle(), &manager).unwrap(); + rt.block_on(short().execute(&manager)).unwrap(); tracing::info!("first executed"); } - let thread_counts = [1, 2, 4, 8, 16]; + let thread_counts = [1, 2, 4, 8, 16, 32, 64, 128]; let mut group = c.benchmark_group("short"); group.sampling_mode(criterion::SamplingMode::Flat); @@ -74,114 +88,69 @@ fn redo_scenarios(c: &mut Criterion) { drop(group); } -/// Sets up `threads` number of requesters to `request_redo`, with the given input. +/// Sets up a multi-threaded tokio runtime with default worker thread count, +/// then, spawn `requesters` tasks that repeatedly: +/// - get input from `input_factor()` +/// - call `manager.request_redo()` with their input +/// +/// This stress-tests the scalability of a single walredo manager at high tokio-level concurrency. +/// +/// Using tokio's default worker thread count means the results will differ on machines +/// with different core countrs. We don't care about that, the performance will always +/// be different on different hardware. To compare performance of different software versions, +/// use the same hardware. fn add_multithreaded_walredo_requesters( b: &mut criterion::Bencher, - threads: u32, + nrequesters: usize, manager: &Arc, input_factory: fn() -> Request, ) { - assert_ne!(threads, 0); + assert_ne!(nrequesters, 0); - if threads == 1 { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let handle = rt.handle(); - b.iter_batched_ref( - || Some(input_factory()), - |input| execute_all(input.take(), handle, manager), - criterion::BatchSize::PerIteration, - ); - } else { - let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); - let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx)); + let barrier = Arc::new(tokio::sync::Barrier::new(nrequesters + 1)); - let barrier = Arc::new(Barrier::new(threads as usize + 1)); - - let jhs = (0..threads) - .map(|_| { - std::thread::spawn({ - let manager = manager.clone(); - let barrier = barrier.clone(); - let work_rx = work_rx.clone(); - move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let handle = rt.handle(); - loop { - // queue up and wait if we want to go another round - if work_rx.lock().unwrap().recv().is_err() { - break; - } - - let input = Some(input_factory()); - - barrier.wait(); - - execute_all(input, handle, &manager).unwrap(); - - barrier.wait(); - } - } - }) - }) - .collect::>(); - - let _jhs = JoinOnDrop(jhs); - - b.iter_batched( - || { - for _ in 0..threads { - work_tx.send(()).unwrap() - } - }, - |()| { - // start the work - barrier.wait(); - - // wait for work to complete - barrier.wait(); - }, - criterion::BatchSize::PerIteration, - ); - - drop(work_tx); + let mut requesters = JoinSet::new(); + for _ in 0..nrequesters { + let _entered = rt.enter(); + let manager = manager.clone(); + let barrier = barrier.clone(); + requesters.spawn(async move { + loop { + let input = input_factory(); + barrier.wait().await; + let page = input.execute(&manager).await.unwrap(); + assert_eq!(page.remaining(), 8192); + barrier.wait().await; + } + }); } -} -struct JoinOnDrop(Vec>); + let do_one_iteration = || { + rt.block_on(async { + barrier.wait().await; + // wait for work to complete + barrier.wait().await; + }) + }; -impl Drop for JoinOnDrop { - // it's not really needless because we want join all then check for panicks - #[allow(clippy::needless_collect)] - fn drop(&mut self) { - // first join all - let results = self.0.drain(..).map(|jh| jh.join()).collect::>(); - // then check the results; panicking here is not great, but it does get the message across - // to the user, and sets an exit value. - results.into_iter().try_for_each(|res| res).unwrap(); - } -} + b.iter_batched( + || { + // warmup + do_one_iteration(); + }, + |()| { + // work loop + do_one_iteration(); + }, + criterion::BatchSize::PerIteration, + ); -fn execute_all( - input: I, - handle: &tokio::runtime::Handle, - manager: &PostgresRedoManager, -) -> anyhow::Result<()> -where - I: IntoIterator, -{ - // just fire all requests as fast as possible - input.into_iter().try_for_each(|req| { - let page = req.execute(handle, manager)?; - assert_eq!(page.remaining(), 8192); - anyhow::Ok(()) - }) + rt.block_on(requesters.shutdown()); } criterion_group!(benches, redo_scenarios); @@ -493,11 +462,7 @@ struct Request { } impl Request { - fn execute( - self, - rt: &tokio::runtime::Handle, - manager: &PostgresRedoManager, - ) -> anyhow::Result { + async fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result { let Request { key, lsn, @@ -506,6 +471,8 @@ impl Request { pg_version, } = self; - rt.block_on(manager.request_redo(key, lsn, base_img, records, pg_version)) + manager + .request_redo(key, lsn, base_img, records, pg_version) + .await } }