From f4b8795cf8ce8a79e71223d50e16beffc22dd96d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 3 Nov 2024 17:24:28 +0100 Subject: [PATCH] Add WalAcceptor throughput benchmark --- safekeeper/benches/receive_wal.rs | 111 ++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 7 deletions(-) diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs index 476d86a7b9..3c0d6faf0a 100644 --- a/safekeeper/benches/receive_wal.rs +++ b/safekeeper/benches/receive_wal.rs @@ -7,15 +7,24 @@ use benchutils::Env; use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion}; use itertools::Itertools as _; use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator}; -use safekeeper::receive_wal::WalAcceptor; +use safekeeper::receive_wal::{self, WalAcceptor}; use safekeeper::safekeeper::{ AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, }; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; +const KB: usize = 1024; +const MB: usize = 1024 * KB; +const GB: usize = 1024 * MB; + // Register benchmarks with Criterion. -criterion_group!(benches, bench_process_msg, bench_wal_acceptor); +criterion_group!( + benches, + bench_process_msg, + bench_wal_acceptor, + bench_wal_acceptor_throughput +); criterion_main!(benches); /// Benchmarks SafeKeeper::process_msg() as time per message. Each message is an AppendRequest with @@ -66,12 +75,12 @@ fn bench_process_msg(c: &mut Criterion) { } } -/// Benchmarks WalAcceptor by sending it a batch of WAL records and waiting for it to confirm that -/// the last LSN has been flushed to storage. We pipeline a bunch of messages instead of measuring -/// each individual message to amortize costs (e.g. fsync), which is more realistic. Records are -/// XlLogicalMessage with a tiny payload. +/// Benchmarks WalAcceptor message processing time by sending it a batch of WAL records and waiting +/// for it to confirm that the last LSN has been flushed to storage. We pipeline a bunch of messages +/// instead of measuring each individual message to amortize costs (e.g. fsync), which is more +/// realistic. Records are XlLogicalMessage with a tiny payload (~64 bytes per record including +/// headers). Records are pre-constructed to avoid skewing the benchmark. /// -/// TODO: add benchmarks with larger data volume, and measure throughput. /// TODO: add benchmarks with in-memory storage, see comment on `Env::make_safekeeper()`: fn bench_wal_acceptor(c: &mut Criterion) { let mut g = c.benchmark_group("wal_acceptor"); @@ -151,3 +160,91 @@ fn bench_wal_acceptor(c: &mut Criterion) { Ok(()) } } + +/// Benchmarks WalAcceptor throughput by sending 1 GB of data with varying message sizes and waiting +/// for the last LSN to be flushed to storage. Only the actual message payload counts towards +/// throughput, headers are excluded and considered overhead. Records are XlLogicalMessage. +/// +/// To avoid running out of memory, messages are constructed during the benchmark. +fn bench_wal_acceptor_throughput(c: &mut Criterion) { + const VOLUME: usize = GB; // NB: excludes message/page/segment headers and padding + + let mut g = c.benchmark_group("wal_acceptor_throughput"); + g.sample_size(10); + g.throughput(criterion::Throughput::Bytes(VOLUME as u64)); + + for fsync in [false, true] { + for size in [KB, 4 * KB, 128 * KB, MB] { + assert_eq!(VOLUME % size, 0, "volume must be divisible by size"); + let count = VOLUME / size; + g.bench_function(format!("fsync={fsync}/size={size}"), |b| { + run_bench(b, count, size, fsync).unwrap() + }); + } + } + + /// The actual benchmark. size is the payload size per message, count is the number of messages. + fn run_bench(b: &mut Bencher, count: usize, size: usize, fsync: bool) -> anyhow::Result<()> { + let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded + + // Construct the payload. The prefix counts towards the payload (including NUL terminator). + let prefix = c"p"; + let prefixlen = prefix.to_bytes_with_nul().len(); + assert!(size >= prefixlen); + let message = vec![0; size - prefixlen]; + + let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message)); + + // Construct and spawn the WalAcceptor task. + let env = Env::new(fsync)?; + + let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE); + let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE); + + runtime.block_on(async { + let tli = env + .make_timeline(NodeId(1), TenantTimelineId::generate()) + .await? + .wal_residence_guard() + .await?; + WalAcceptor::spawn(tli, msg_rx, reply_tx, Some(0)); + anyhow::Ok(()) + })?; + + // Ingest the WAL. + b.iter(|| { + runtime.block_on(async { + let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest { + h: AppendRequestHeader { + term: 1, + term_start_lsn: Lsn(0), + begin_lsn: lsn, + end_lsn: lsn + record.len() as u64, + commit_lsn: Lsn(0), + truncate_lsn: Lsn(0), + proposer_uuid: [0; 16], + }, + wal_data: record, + }); + + // Send requests. + for req in reqgen { + _ = reply_rx.try_recv(); // discard any replies, to avoid blocking + let msg = ProposerAcceptorMessage::AppendRequest(req); + msg_tx.send(msg).await.expect("send failed"); + } + + // Wait for last message to get flushed. + while let Some(reply) = reply_rx.recv().await { + if let AcceptorProposerMessage::AppendResponse(resp) = reply { + if resp.flush_lsn >= walgen.lsn { + return; + } + } + } + panic!("disconnected") + }) + }); + Ok(()) + } +}