diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index edc09d0bf2..45936cb3fa 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -1,7 +1,7 @@ //! Quantify a single walredo manager's throughput under N concurrent callers. //! //! The benchmark implementation ([`bench_impl`]) is parametrized by -//! - `redo_work` => [`Request::short_request`] or [`Request::medium_request`] +//! - `redo_work` => an async closure that takes a `PostgresRedoManager` and performs one redo //! - `n_redos` => number of times the benchmark shell execute the `redo_work` //! - `nclients` => number of clients (more on this shortly). //! @@ -10,7 +10,7 @@ //! Each task executes the `redo_work` `n_redos/nclients` times. //! //! We exercise the following combinations: -//! - `redo_work = short / medium`` +//! - `redo_work = ping / short / medium`` //! - `nclients = [1, 2, 4, 8, 16, 32, 64, 128]` //! //! We let `criterion` determine the `n_redos` using `iter_custom`. @@ -27,33 +27,43 @@ //! //! # Reference Numbers //! -//! 2024-04-15 on i3en.3xlarge +//! 2024-09-18 on im4gn.2xlarge //! //! ```text -//! short/1 time: [24.584 µs 24.737 µs 24.922 µs] -//! short/2 time: [33.479 µs 33.660 µs 33.888 µs] -//! short/4 time: [42.713 µs 43.046 µs 43.440 µs] -//! short/8 time: [71.814 µs 72.478 µs 73.240 µs] -//! short/16 time: [132.73 µs 134.45 µs 136.22 µs] -//! short/32 time: [258.31 µs 260.73 µs 263.27 µs] -//! short/64 time: [511.61 µs 514.44 µs 517.51 µs] -//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms] -//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs] -//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs] -//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs] -//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs] -//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] -//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] -//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] -//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] +//! ping/1 time: [21.789 µs 21.918 µs 22.078 µs] +//! ping/2 time: [27.686 µs 27.812 µs 27.970 µs] +//! ping/4 time: [35.468 µs 35.671 µs 35.926 µs] +//! ping/8 time: [59.682 µs 59.987 µs 60.363 µs] +//! ping/16 time: [101.79 µs 102.37 µs 103.08 µs] +//! ping/32 time: [184.18 µs 185.15 µs 186.36 µs] +//! ping/64 time: [349.86 µs 351.45 µs 353.47 µs] +//! ping/128 time: [684.53 µs 687.98 µs 692.17 µs] +//! short/1 time: [31.833 µs 32.126 µs 32.428 µs] +//! short/2 time: [35.558 µs 35.756 µs 35.992 µs] +//! short/4 time: [44.850 µs 45.138 µs 45.484 µs] +//! short/8 time: [65.985 µs 66.379 µs 66.853 µs] +//! short/16 time: [127.06 µs 127.90 µs 128.87 µs] +//! short/32 time: [252.98 µs 254.70 µs 256.73 µs] +//! short/64 time: [497.13 µs 499.86 µs 503.26 µs] +//! short/128 time: [987.46 µs 993.45 µs 1.0004 ms] +//! medium/1 time: [137.91 µs 138.55 µs 139.35 µs] +//! medium/2 time: [192.00 µs 192.91 µs 194.07 µs] +//! medium/4 time: [389.62 µs 391.55 µs 394.01 µs] +//! medium/8 time: [776.80 µs 780.33 µs 784.77 µs] +//! medium/16 time: [1.5323 ms 1.5383 ms 1.5459 ms] +//! medium/32 time: [3.0120 ms 3.0226 ms 3.0350 ms] +//! medium/64 time: [5.7405 ms 5.7787 ms 5.8166 ms] +//! medium/128 time: [10.412 ms 10.574 ms 10.718 ms] //! ``` use anyhow::Context; use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; +use once_cell::sync::Lazy; use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager}; use pageserver_api::{key::Key, shard::TenantShardId}; use std::{ + future::Future, sync::Arc, time::{Duration, Instant}, }; @@ -61,40 +71,59 @@ use tokio::{sync::Barrier, task::JoinSet}; use utils::{id::TenantId, lsn::Lsn}; fn bench(c: &mut Criterion) { - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group("short"); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::short_input()); - b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); - }, - ); - } - } - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group("medium"); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::medium_input()); - b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); - }, - ); - } + macro_rules! bench_group { + ($name:expr, $redo_work:expr) => {{ + let name: &str = $name; + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group(name); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + b.iter_custom(|iters| bench_impl($redo_work, iters, *nclients)); + }, + ); + } + }}; } + // + // benchmark the protocol implementation + // + let pg_version = 14; + bench_group!( + "ping", + Arc::new(move |mgr: Arc| async move { + let _: () = mgr.ping(pg_version).await.unwrap(); + }) + ); + // + // benchmarks with actual record redo + // + let make_redo_work = |req: &'static Request| { + Arc::new(move |mgr: Arc| async move { + let page = req.execute(&mgr).await.unwrap(); + assert_eq!(page.remaining(), 8192); + }) + }; + bench_group!("short", { + static REQUEST: Lazy = Lazy::new(Request::short_input); + make_redo_work(&REQUEST) + }); + bench_group!("medium", { + static REQUEST: Lazy = Lazy::new(Request::medium_input); + make_redo_work(&REQUEST) + }); } criterion::criterion_group!(benches, bench); criterion::criterion_main!(benches); // Returns the sum of each client's wall-clock time spent executing their share of the n_redos. -fn bench_impl(redo_work: Arc, n_redos: u64, nclients: u64) -> Duration { +fn bench_impl(redo_work: Arc, n_redos: u64, nclients: u64) -> Duration +where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap(); let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); @@ -135,17 +164,20 @@ fn bench_impl(redo_work: Arc, n_redos: u64, nclients: u64) -> Duration }) } -async fn client( +async fn client( mgr: Arc, start: Arc, - redo_work: Arc, + redo_work: Arc, n_redos: u64, -) -> Duration { +) -> Duration +where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ start.wait().await; let start = Instant::now(); for _ in 0..n_redos { - let page = redo_work.execute(&mgr).await.unwrap(); - assert_eq!(page.remaining(), 8192); + redo_work(Arc::clone(&mgr)).await; // The real pageserver will rarely if ever do 2 walredos in a row without // yielding to the executor. tokio::task::yield_now().await; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 0fe7def8b0..a1c9fc5651 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -205,6 +205,22 @@ impl PostgresRedoManager { } } + /// Do a ping request-response roundtrip. + /// + /// Not used in production, but by Rust benchmarks. + /// + /// # Cancel-Safety + /// + /// This method is cancellation-safe. + pub async fn ping(&self, pg_version: u32) -> Result<(), Error> { + self.do_with_walredo_process(pg_version, |proc| async move { + proc.ping(Duration::from_secs(1)) + .await + .map_err(Error::Other) + }) + .await + } + pub fn status(&self) -> WalRedoManagerStatus { WalRedoManagerStatus { last_redo_at: { @@ -297,6 +313,9 @@ impl PostgresRedoManager { } } + /// # Cancel-Safety + /// + /// This method is cancel-safe iff `closure` is cancel-safe. async fn do_with_walredo_process< F: FnOnce(Arc) -> Fut, Fut: Future>, @@ -537,6 +556,17 @@ mod tests { use tracing::Instrument; use utils::{id::TenantId, lsn::Lsn}; + #[tokio::test] + async fn test_ping() { + let h = RedoHarness::new().unwrap(); + + h.manager + .ping(14) + .instrument(h.span()) + .await + .expect("ping should work"); + } + #[tokio::test] async fn short_v14_redo() { let expected = std::fs::read("test_data/short_v14_redo.page").unwrap(); diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 9140d4f6aa..f3197e68b5 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -6,6 +6,7 @@ use self::no_leak_child::NoLeakChild; use crate::{ config::PageServerConf, metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, + page_cache::PAGE_SZ, span::debug_assert_current_span_has_tenant_id, walrecord::NeonWalRecord, }; @@ -237,6 +238,26 @@ impl WalRedoProcess { res } + /// Do a ping request-response roundtrip. + /// + /// Not used in production, but by Rust benchmarks. + pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> { + let mut writebuf: Vec = Vec::with_capacity(4); + protocol::build_ping_msg(&mut writebuf); + let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await + else { + anyhow::bail!("WAL redo ping timed out"); + }; + let response = res?; + if response.len() != PAGE_SZ { + anyhow::bail!( + "WAL redo ping response should respond with page-sized response: {}", + response.len() + ); + } + Ok(()) + } + /// # Cancel-Safety /// /// When not polled to completion (e.g. because in `tokio::select!` another diff --git a/pageserver/src/walredo/process/protocol.rs b/pageserver/src/walredo/process/protocol.rs index b703344cc8..de3ca8741b 100644 --- a/pageserver/src/walredo/process/protocol.rs +++ b/pageserver/src/walredo/process/protocol.rs @@ -55,3 +55,8 @@ pub(crate) fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); } + +pub(crate) fn build_ping_msg(buf: &mut Vec) { + buf.put_u8(b'H'); + buf.put_u32(4); +} diff --git a/pgxn/neon_walredo/walredoproc.c b/pgxn/neon_walredo/walredoproc.c index 219ca85207..f98aa1cbe7 100644 --- a/pgxn/neon_walredo/walredoproc.c +++ b/pgxn/neon_walredo/walredoproc.c @@ -24,6 +24,7 @@ * PushPage ('P'): Copy a page image (in the payload) to buffer cache * ApplyRecord ('A'): Apply a WAL record (in the payload) * GetPage ('G'): Return a page image from buffer cache. + * Ping ('H'): Return the input message. * * Currently, you only get a response to GetPage requests; the response is * simply a 8k page, without any headers. Errors are logged to stderr. @@ -133,6 +134,7 @@ static void ApplyRecord(StringInfo input_message); static void apply_error_callback(void *arg); static bool redo_block_filter(XLogReaderState *record, uint8 block_id); static void GetPage(StringInfo input_message); +static void Ping(StringInfo input_message); static ssize_t buffered_read(void *buf, size_t count); static void CreateFakeSharedMemoryAndSemaphores(); @@ -394,6 +396,10 @@ WalRedoMain(int argc, char *argv[]) GetPage(&input_message); break; + case 'H': /* Ping */ + Ping(&input_message); + break; + /* * EOF means we're done. Perform normal shutdown. */ @@ -1057,6 +1063,36 @@ GetPage(StringInfo input_message) } +static void +Ping(StringInfo input_message) +{ + int tot_written; + /* Response: the input message */ + tot_written = 0; + do { + ssize_t rc; + /* We don't need alignment, but it's bad practice to use char[BLCKSZ] */ +#if PG_VERSION_NUM >= 160000 + static const PGIOAlignedBlock response; +#else + static const PGAlignedBlock response; +#endif + rc = write(STDOUT_FILENO, &response.data[tot_written], BLCKSZ - tot_written); + if (rc < 0) { + /* If interrupted by signal, just retry */ + if (errno == EINTR) + continue; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to stdout: %m"))); + } + tot_written += rc; + } while (tot_written < BLCKSZ); + + elog(TRACE, "Page sent back for ping"); +} + + /* Buffer used by buffered_read() */ static char stdin_buf[16 * 1024]; static size_t stdin_len = 0; /* # of bytes in buffer */