mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
bench_walredo: use tokio multi-threaded runtime (#6743)
fixes https://github.com/neondatabase/neon/issues/6648 Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -6,14 +6,28 @@
|
||||
//! There are two sets of inputs; `short` and `medium`. They were collected on postgres v14 by
|
||||
//! logging what happens when a sequential scan is requested on a small table, then picking out two
|
||||
//! suitable from logs.
|
||||
//!
|
||||
//!
|
||||
//! Reference data (git blame to see commit) on an i3en.3xlarge
|
||||
// ```text
|
||||
//! short/short/1 time: [39.175 µs 39.348 µs 39.536 µs]
|
||||
//! short/short/2 time: [51.227 µs 51.487 µs 51.755 µs]
|
||||
//! short/short/4 time: [76.048 µs 76.362 µs 76.674 µs]
|
||||
//! short/short/8 time: [128.94 µs 129.82 µs 130.74 µs]
|
||||
//! short/short/16 time: [227.84 µs 229.00 µs 230.28 µs]
|
||||
//! short/short/32 time: [455.97 µs 457.81 µs 459.90 µs]
|
||||
//! short/short/64 time: [902.46 µs 904.84 µs 907.32 µs]
|
||||
//! short/short/128 time: [1.7416 ms 1.7487 ms 1.7561 ms]
|
||||
//! ``
|
||||
|
||||
use std::sync::{Arc, Barrier};
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver::{
|
||||
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::{id::TenantId, lsn::Lsn};
|
||||
|
||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
@@ -39,11 +53,11 @@ fn redo_scenarios(c: &mut Criterion) {
|
||||
.build()
|
||||
.unwrap();
|
||||
tracing::info!("executing first");
|
||||
short().execute(rt.handle(), &manager).unwrap();
|
||||
rt.block_on(short().execute(&manager)).unwrap();
|
||||
tracing::info!("first executed");
|
||||
}
|
||||
|
||||
let thread_counts = [1, 2, 4, 8, 16];
|
||||
let thread_counts = [1, 2, 4, 8, 16, 32, 64, 128];
|
||||
|
||||
let mut group = c.benchmark_group("short");
|
||||
group.sampling_mode(criterion::SamplingMode::Flat);
|
||||
@@ -74,114 +88,69 @@ fn redo_scenarios(c: &mut Criterion) {
|
||||
drop(group);
|
||||
}
|
||||
|
||||
/// Sets up `threads` number of requesters to `request_redo`, with the given input.
|
||||
/// Sets up a multi-threaded tokio runtime with default worker thread count,
|
||||
/// then, spawn `requesters` tasks that repeatedly:
|
||||
/// - get input from `input_factor()`
|
||||
/// - call `manager.request_redo()` with their input
|
||||
///
|
||||
/// This stress-tests the scalability of a single walredo manager at high tokio-level concurrency.
|
||||
///
|
||||
/// Using tokio's default worker thread count means the results will differ on machines
|
||||
/// with different core countrs. We don't care about that, the performance will always
|
||||
/// be different on different hardware. To compare performance of different software versions,
|
||||
/// use the same hardware.
|
||||
fn add_multithreaded_walredo_requesters(
|
||||
b: &mut criterion::Bencher,
|
||||
threads: u32,
|
||||
nrequesters: usize,
|
||||
manager: &Arc<PostgresRedoManager>,
|
||||
input_factory: fn() -> Request,
|
||||
) {
|
||||
assert_ne!(threads, 0);
|
||||
assert_ne!(nrequesters, 0);
|
||||
|
||||
if threads == 1 {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = rt.handle();
|
||||
b.iter_batched_ref(
|
||||
|| Some(input_factory()),
|
||||
|input| execute_all(input.take(), handle, manager),
|
||||
criterion::BatchSize::PerIteration,
|
||||
);
|
||||
} else {
|
||||
let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize);
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx));
|
||||
let barrier = Arc::new(tokio::sync::Barrier::new(nrequesters + 1));
|
||||
|
||||
let barrier = Arc::new(Barrier::new(threads as usize + 1));
|
||||
|
||||
let jhs = (0..threads)
|
||||
.map(|_| {
|
||||
std::thread::spawn({
|
||||
let manager = manager.clone();
|
||||
let barrier = barrier.clone();
|
||||
let work_rx = work_rx.clone();
|
||||
move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = rt.handle();
|
||||
loop {
|
||||
// queue up and wait if we want to go another round
|
||||
if work_rx.lock().unwrap().recv().is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
let input = Some(input_factory());
|
||||
|
||||
barrier.wait();
|
||||
|
||||
execute_all(input, handle, &manager).unwrap();
|
||||
|
||||
barrier.wait();
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let _jhs = JoinOnDrop(jhs);
|
||||
|
||||
b.iter_batched(
|
||||
|| {
|
||||
for _ in 0..threads {
|
||||
work_tx.send(()).unwrap()
|
||||
}
|
||||
},
|
||||
|()| {
|
||||
// start the work
|
||||
barrier.wait();
|
||||
|
||||
// wait for work to complete
|
||||
barrier.wait();
|
||||
},
|
||||
criterion::BatchSize::PerIteration,
|
||||
);
|
||||
|
||||
drop(work_tx);
|
||||
let mut requesters = JoinSet::new();
|
||||
for _ in 0..nrequesters {
|
||||
let _entered = rt.enter();
|
||||
let manager = manager.clone();
|
||||
let barrier = barrier.clone();
|
||||
requesters.spawn(async move {
|
||||
loop {
|
||||
let input = input_factory();
|
||||
barrier.wait().await;
|
||||
let page = input.execute(&manager).await.unwrap();
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
barrier.wait().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
struct JoinOnDrop(Vec<std::thread::JoinHandle<()>>);
|
||||
let do_one_iteration = || {
|
||||
rt.block_on(async {
|
||||
barrier.wait().await;
|
||||
// wait for work to complete
|
||||
barrier.wait().await;
|
||||
})
|
||||
};
|
||||
|
||||
impl Drop for JoinOnDrop {
|
||||
// it's not really needless because we want join all then check for panicks
|
||||
#[allow(clippy::needless_collect)]
|
||||
fn drop(&mut self) {
|
||||
// first join all
|
||||
let results = self.0.drain(..).map(|jh| jh.join()).collect::<Vec<_>>();
|
||||
// then check the results; panicking here is not great, but it does get the message across
|
||||
// to the user, and sets an exit value.
|
||||
results.into_iter().try_for_each(|res| res).unwrap();
|
||||
}
|
||||
}
|
||||
b.iter_batched(
|
||||
|| {
|
||||
// warmup
|
||||
do_one_iteration();
|
||||
},
|
||||
|()| {
|
||||
// work loop
|
||||
do_one_iteration();
|
||||
},
|
||||
criterion::BatchSize::PerIteration,
|
||||
);
|
||||
|
||||
fn execute_all<I>(
|
||||
input: I,
|
||||
handle: &tokio::runtime::Handle,
|
||||
manager: &PostgresRedoManager,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = Request>,
|
||||
{
|
||||
// just fire all requests as fast as possible
|
||||
input.into_iter().try_for_each(|req| {
|
||||
let page = req.execute(handle, manager)?;
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
anyhow::Ok(())
|
||||
})
|
||||
rt.block_on(requesters.shutdown());
|
||||
}
|
||||
|
||||
criterion_group!(benches, redo_scenarios);
|
||||
@@ -493,11 +462,7 @@ struct Request {
|
||||
}
|
||||
|
||||
impl Request {
|
||||
fn execute(
|
||||
self,
|
||||
rt: &tokio::runtime::Handle,
|
||||
manager: &PostgresRedoManager,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
async fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
|
||||
let Request {
|
||||
key,
|
||||
lsn,
|
||||
@@ -506,6 +471,8 @@ impl Request {
|
||||
pg_version,
|
||||
} = self;
|
||||
|
||||
rt.block_on(manager.request_redo(key, lsn, base_img, records, pg_version))
|
||||
manager
|
||||
.request_redo(key, lsn, base_img, records, pg_version)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user