From a8a9bee6025966a3634ebccbf3c912390542a5bb Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 16 Jan 2023 18:24:45 +0200 Subject: [PATCH] walredo: simple tests and bench updates (#3045) Separated from #2875. The microbenchmark has been validated to show similar difference as to larger scale OLTP benchmark. --- pageserver/benches/bench_walredo.rs | 128 +++++++++++++++--------- pageserver/fixtures/short_v14_redo.page | Bin 0 -> 8192 bytes pageserver/src/walredo.rs | 107 ++++++++++++++++++++ 3 files changed, 189 insertions(+), 46 deletions(-) create mode 100644 pageserver/fixtures/short_v14_redo.page diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 61011c9f36..49216c68f1 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -30,33 +30,44 @@ fn redo_scenarios(c: &mut Criterion) { let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); let tenant_id = TenantId::generate(); - // std::fs::create_dir_all(conf.tenant_path(&tenant_id)).unwrap(); - let mut manager = PostgresRedoManager::new(conf, tenant_id); - manager.launch_process(14).unwrap(); + + let manager = PostgresRedoManager::new(conf, tenant_id); let manager = Arc::new(manager); + tracing::info!("executing first"); + short().execute(&manager).unwrap(); + tracing::info!("first executed"); + let thread_counts = [1, 2, 4, 8, 16]; - for thread_count in thread_counts { - c.bench_with_input( - BenchmarkId::new("short-50record", thread_count), - &thread_count, - |b, thread_count| { - add_multithreaded_walredo_requesters(b, *thread_count, &manager, short, 50); - }, - ); - } + let mut group = c.benchmark_group("short"); + group.sampling_mode(criterion::SamplingMode::Flat); for thread_count in thread_counts { - c.bench_with_input( - BenchmarkId::new("medium-10record", thread_count), + group.bench_with_input( + BenchmarkId::new("short", thread_count), &thread_count, |b, thread_count| { - add_multithreaded_walredo_requesters(b, *thread_count, &manager, medium, 10); + add_multithreaded_walredo_requesters(b, *thread_count, &manager, short); }, ); } + drop(group); + + let mut group = c.benchmark_group("medium"); + group.sampling_mode(criterion::SamplingMode::Flat); + + for thread_count in thread_counts { + group.bench_with_input( + BenchmarkId::new("medium", thread_count), + &thread_count, + |b, thread_count| { + add_multithreaded_walredo_requesters(b, *thread_count, &manager, medium); + }, + ); + } + drop(group); } /// Sets up `threads` number of requesters to `request_redo`, with the given input. @@ -65,46 +76,66 @@ fn add_multithreaded_walredo_requesters( threads: u32, manager: &Arc, input_factory: fn() -> Request, - request_repeats: usize, ) { - b.iter_batched_ref( - || { - // barrier for all of the threads, and the benchmarked thread - let barrier = Arc::new(Barrier::new(threads as usize + 1)); + assert_ne!(threads, 0); - let jhs = (0..threads) - .map(|_| { - std::thread::spawn({ - let manager = manager.clone(); - let barrier = barrier.clone(); - move || { - let input = std::iter::repeat(input_factory()) - .take(request_repeats) - .collect::>(); + if threads == 1 { + b.iter_batched_ref( + || Some(input_factory()), + |input| execute_all(input.take(), manager), + criterion::BatchSize::PerIteration, + ); + } else { + let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize); - barrier.wait(); + let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx)); - execute_all(input, &manager).unwrap(); + let barrier = Arc::new(Barrier::new(threads as usize + 1)); - barrier.wait(); + let jhs = (0..threads) + .map(|_| { + std::thread::spawn({ + let manager = manager.clone(); + let barrier = barrier.clone(); + let work_rx = work_rx.clone(); + move || loop { + // queue up and wait if we want to go another round + if work_rx.lock().unwrap().recv().is_err() { + break; } - }) + + let input = Some(input_factory()); + + barrier.wait(); + + execute_all(input, &manager).unwrap(); + + barrier.wait(); + } }) - .collect::>(); + }) + .collect::>(); - (barrier, JoinOnDrop(jhs)) - }, - |input| { - let barrier = &input.0; + let _jhs = JoinOnDrop(jhs); - // start the work - barrier.wait(); + b.iter_batched( + || { + for _ in 0..threads { + work_tx.send(()).unwrap() + } + }, + |()| { + // start the work + barrier.wait(); - // wait for work to complete - barrier.wait(); - }, - criterion::BatchSize::PerIteration, - ); + // wait for work to complete + barrier.wait(); + }, + criterion::BatchSize::PerIteration, + ); + + drop(work_tx); + } } struct JoinOnDrop(Vec>); @@ -121,7 +152,10 @@ impl Drop for JoinOnDrop { } } -fn execute_all(input: Vec, manager: &PostgresRedoManager) -> Result<(), WalRedoError> { +fn execute_all(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError> +where + I: IntoIterator, +{ // just fire all requests as fast as possible input.into_iter().try_for_each(|req| { let page = req.execute(manager)?; @@ -143,6 +177,7 @@ macro_rules! lsn { }}; } +/// Short payload, 1132 bytes. // pg_records are copypasted from log, where they are put with Debug impl of Bytes, which uses \0 // for null bytes. #[allow(clippy::octal_escapes)] @@ -172,6 +207,7 @@ fn short() -> Request { } } +/// Medium sized payload, serializes as 26393 bytes. // see [`short`] #[allow(clippy::octal_escapes)] fn medium() -> Request { diff --git a/pageserver/fixtures/short_v14_redo.page b/pageserver/fixtures/short_v14_redo.page new file mode 100644 index 0000000000000000000000000000000000000000..9e9c266cadf29cde6de54d3907c0d41dde41e52e GIT binary patch literal 8192 zcmeI1%}T>S6oqeMh5CaCqPP*$wUD}Z;Xn&6x^XGEC{c?Rw1#vxy0MQTg4$>BHQb6@ zpG4w4GfAOc`2$(VxsZEjZthIZ`L+=e8JVj%vn@->;WvbOaI4_}kO7>ThTh+aKEg ssh|2;hQH6)pDF#}Uc{QquPxL~9fZGa*V8oP4|-j|AJ%wq51zu`7X|(@NB{r; literal 0 HcmV?d00001 diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 7cf489562b..a552c05d63 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -1010,3 +1010,110 @@ fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); } + +#[cfg(test)] +mod tests { + use super::{PostgresRedoManager, WalRedoManager}; + use crate::repository::Key; + use crate::{config::PageServerConf, walrecord::NeonWalRecord}; + use bytes::Bytes; + use std::str::FromStr; + use utils::{id::TenantId, lsn::Lsn}; + + #[test] + fn short_v14_redo() { + let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap(); + + let h = RedoHarness::new().unwrap(); + + let page = h + .manager + .request_redo( + Key { + field1: 0, + field2: 1663, + field3: 13010, + field4: 1259, + field5: 0, + field6: 0, + }, + Lsn::from_str("0/16E2408").unwrap(), + None, + short_records(), + 14, + ) + .unwrap(); + + assert_eq!(&expected, &*page); + } + + #[test] + fn short_v14_fails_for_wrong_key_but_returns_zero_page() { + let h = RedoHarness::new().unwrap(); + + let page = h + .manager + .request_redo( + Key { + field1: 0, + field2: 1663, + // key should be 13010 + field3: 13130, + field4: 1259, + field5: 0, + field6: 0, + }, + Lsn::from_str("0/16E2408").unwrap(), + None, + short_records(), + 14, + ) + .unwrap(); + + // TODO: there will be some stderr printout, which is forwarded to tracing that could + // perhaps be captured as long as it's in the same thread. + assert_eq!(page, crate::ZERO_PAGE); + } + + #[allow(clippy::octal_escapes)] + fn short_records() -> Vec<(Lsn, NeonWalRecord)> { + vec![ + ( + Lsn::from_str("0/16A9388").unwrap(), + NeonWalRecord::Postgres { + will_init: true, + rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01") + } + ), + ( + Lsn::from_str("0/16D4080").unwrap(), + NeonWalRecord::Postgres { + will_init: false, + rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0") + } + ) + ] + } + + struct RedoHarness { + // underscored because unused, except for removal at drop + _repo_dir: tempfile::TempDir, + manager: PostgresRedoManager, + } + + impl RedoHarness { + fn new() -> anyhow::Result { + let repo_dir = tempfile::tempdir()?; + let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); + let conf = Box::leak(Box::new(conf)); + let tenant_id = TenantId::generate(); + + let manager = PostgresRedoManager::new(conf, tenant_id); + + Ok(RedoHarness { + _repo_dir: repo_dir, + manager, + }) + } + } +}