Compare commits

...

5 Commits

Author SHA1 Message Date
Christian Schwarz
1822ef0ee9 for #6743 (walredo benches to async): finish the change (#6761)
This PR finishes the work started by https://github.com/calinanca99 in
https://github.com/neondatabase/neon/pull/6743

- always use default worker thread count & explain rationale behind that
- fix deadlocks caused by remaining sync primitive
- increase thread counts
2024-02-15 14:40:42 +01:00
Christian Schwarz
c7d4136fb3 Merge branch 'main' into pr/calinanca99/6743 2024-02-14 16:06:55 +00:00
Christian Schwarz
0c43acce16 Merge branch 'main' into pr/calinanca99/6743 2024-02-14 16:05:44 +00:00
calinanca99
1c64de709d Use futures::future::try_join_all 2024-02-13 17:27:44 +01:00
calinanca99
863e26d29a Use multi-threaded runtime instead of single-threaded ones 2024-02-13 17:12:14 +01:00

View File

@@ -7,13 +7,14 @@
//! logging what happens when a sequential scan is requested on a small table, then picking out two
//! suitable from logs.
use std::sync::{Arc, Barrier};
use std::sync::Arc;
use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
};
use pageserver_api::shard::TenantShardId;
use tokio_util::sync::CancellationToken;
use utils::{id::TenantId, lsn::Lsn};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
@@ -39,11 +40,11 @@ fn redo_scenarios(c: &mut Criterion) {
.build()
.unwrap();
tracing::info!("executing first");
short().execute(rt.handle(), &manager).unwrap();
rt.block_on(short().execute(&manager)).unwrap();
tracing::info!("first executed");
}
let thread_counts = [1, 2, 4, 8, 16];
let thread_counts = [1, 2, 4, 8, 16, 32, 64, 128];
let mut group = c.benchmark_group("short");
group.sampling_mode(criterion::SamplingMode::Flat);
@@ -74,116 +75,85 @@ fn redo_scenarios(c: &mut Criterion) {
drop(group);
}
/// Sets up `threads` number of requesters to `request_redo`, with the given input.
/// Sets up a multi-threaded tokio runtime with default worker thread count,
/// then, spawn `requesters` tasks that repeatedly:
/// - get input from `input_factor()`
/// - call `manager.request_redo()` with their input
///
/// This stress-tests the scalability of a single walredo manager at high tokio-level concurrency.
///
/// Using tokio's default worker thread count means the results will differ on machines
/// with different core countrs. We don't care about that, the performance will always
/// be different on different hardware. To compare performance of different software versions,
/// use the same hardware.
fn add_multithreaded_walredo_requesters(
b: &mut criterion::Bencher,
threads: u32,
requesters: usize,
manager: &Arc<PostgresRedoManager>,
input_factory: fn() -> Request,
) {
assert_ne!(threads, 0);
assert_ne!(requesters, 0);
if threads == 1 {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = rt.handle();
b.iter_batched_ref(
|| Some(input_factory()),
|input| execute_all(input.take(), handle, manager),
criterion::BatchSize::PerIteration,
);
} else {
let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx));
let cancel = CancellationToken::new();
let barrier = Arc::new(Barrier::new(threads as usize + 1));
let barrier = Arc::new(tokio::sync::Barrier::new(requesters + 1));
let jhs = (0..threads)
.map(|_| {
std::thread::spawn({
let manager = manager.clone();
let barrier = barrier.clone();
let work_rx = work_rx.clone();
move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = rt.handle();
loop {
// queue up and wait if we want to go another round
if work_rx.lock().unwrap().recv().is_err() {
break;
}
let input = Some(input_factory());
barrier.wait();
execute_all(input, handle, &manager).unwrap();
barrier.wait();
}
let jhs = (0..requesters)
.map(|_| {
let manager = manager.clone();
let barrier = barrier.clone();
let cancel = cancel.clone();
rt.spawn(async move {
let work_loop = async move {
loop {
let input = input_factory();
barrier.wait().await;
let page = input.execute(&manager).await.unwrap();
assert_eq!(page.remaining(), 8192);
barrier.wait().await;
}
})
})
.collect::<Vec<_>>();
let _jhs = JoinOnDrop(jhs);
b.iter_batched(
|| {
for _ in 0..threads {
work_tx.send(()).unwrap()
};
tokio::select! {
_ = work_loop => {},
_ = cancel.cancelled() => { }
}
},
|()| {
// start the work
barrier.wait();
})
})
.collect::<Vec<_>>();
b.iter_batched(
|| (),
|()| {
// start the work
rt.block_on(async {
barrier.wait().await;
// wait for work to complete
barrier.wait();
},
criterion::BatchSize::PerIteration,
);
barrier.wait().await;
});
},
criterion::BatchSize::PerIteration,
);
drop(work_tx);
}
cancel.cancel();
let jhs = JoinOnDrop(jhs);
rt.block_on(jhs.join_all());
}
struct JoinOnDrop(Vec<std::thread::JoinHandle<()>>);
struct JoinOnDrop(Vec<tokio::task::JoinHandle<()>>);
impl Drop for JoinOnDrop {
// it's not really needless because we want join all then check for panicks
#[allow(clippy::needless_collect)]
fn drop(&mut self) {
// first join all
let results = self.0.drain(..).map(|jh| jh.join()).collect::<Vec<_>>();
// then check the results; panicking here is not great, but it does get the message across
// to the user, and sets an exit value.
results.into_iter().try_for_each(|res| res).unwrap();
impl JoinOnDrop {
/// Checks all the futures for panics.
pub async fn join_all(self) {
futures::future::try_join_all(self.0).await.unwrap();
}
}
fn execute_all<I>(
input: I,
handle: &tokio::runtime::Handle,
manager: &PostgresRedoManager,
) -> anyhow::Result<()>
where
I: IntoIterator<Item = Request>,
{
// just fire all requests as fast as possible
input.into_iter().try_for_each(|req| {
let page = req.execute(handle, manager)?;
assert_eq!(page.remaining(), 8192);
anyhow::Ok(())
})
}
criterion_group!(benches, redo_scenarios);
criterion_main!(benches);
@@ -493,11 +463,7 @@ struct Request {
}
impl Request {
fn execute(
self,
rt: &tokio::runtime::Handle,
manager: &PostgresRedoManager,
) -> anyhow::Result<Bytes> {
async fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
let Request {
key,
lsn,
@@ -506,6 +472,8 @@ impl Request {
pg_version,
} = self;
rt.block_on(manager.request_redo(key, lsn, base_img, records, pg_version))
manager
.request_redo(key, lsn, base_img, records, pg_version)
.await
}
}