diff --git a/Cargo.lock b/Cargo.lock index 9c2a0b455e..7d18f44aec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5146,6 +5146,7 @@ dependencies = [ "chrono", "clap", "crc32c", + "criterion", "desim", "fail", "futures", @@ -5153,6 +5154,7 @@ dependencies = [ "http 1.1.0", "humantime", "hyper 0.14.30", + "itertools 0.10.5", "metrics", "once_cell", "parking_lot 0.12.1", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index ec08d02240..85561e4aff 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -61,8 +61,14 @@ utils.workspace = true workspace_hack.workspace = true [dev-dependencies] +criterion.workspace = true +itertools.workspace = true walproposer.workspace = true rand.workspace = true desim.workspace = true tracing.workspace = true tracing-subscriber = { workspace = true, features = ["json"] } + +[[bench]] +name = "receive_wal" +harness = false diff --git a/safekeeper/benches/README.md b/safekeeper/benches/README.md new file mode 100644 index 0000000000..4119cc8d6e --- /dev/null +++ b/safekeeper/benches/README.md @@ -0,0 +1,22 @@ +## Safekeeper Benchmarks + +To run benchmarks: + +```sh +# All benchmarks. +cargo bench --package safekeeper + +# Specific file. +cargo bench --package safekeeper --bench receive_wal + +# Specific benchmark. +cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false + +# List available benchmarks. +cargo bench --package safekeeper --benches -- --list +``` + +Additional charts and statistics are available in `target/criterion/report/index.html`. + +Benchmarks are automatically compared against the previous run. To compare against other runs, see +`--baseline` and `--save-baseline`. \ No newline at end of file diff --git a/safekeeper/benches/benchutils.rs b/safekeeper/benches/benchutils.rs new file mode 100644 index 0000000000..4e8dc58c49 --- /dev/null +++ b/safekeeper/benches/benchutils.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use camino_tempfile::Utf8TempDir; +use safekeeper::rate_limit::RateLimiter; +use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory}; +use safekeeper::state::{TimelinePersistentState, TimelineState}; +use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline}; +use safekeeper::timelines_set::TimelinesSet; +use safekeeper::wal_backup::remote_timeline_path; +use safekeeper::{control_file, wal_storage, SafeKeeperConf}; +use tokio::fs::create_dir_all; +use utils::id::{NodeId, TenantTimelineId}; +use utils::lsn::Lsn; + +/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop. +pub struct Env { + /// Whether to enable fsync. + pub fsync: bool, + /// Benchmark directory. Deleted when dropped. + pub tempdir: Utf8TempDir, +} + +impl Env { + /// Creates a new benchmarking environment in a temporary directory. fsync controls whether to + /// enable fsyncing. + pub fn new(fsync: bool) -> anyhow::Result { + let tempdir = camino_tempfile::tempdir()?; + Ok(Self { fsync, tempdir }) + } + + /// Constructs a Safekeeper config for the given node ID. + fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf { + let mut conf = SafeKeeperConf::dummy(); + conf.my_id = node_id; + conf.no_sync = !self.fsync; + conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}")); + conf + } + + /// Constructs a Safekeeper with the given node and tenant/timeline ID. + /// + /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be + /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not + /// currently possible to construct a timeline using non-file storage since StateSK only accepts + /// SafeKeeper. + pub async fn make_safekeeper( + &self, + node_id: NodeId, + ttid: TenantTimelineId, + ) -> anyhow::Result> { + let conf = self.make_conf(node_id); + + let timeline_dir = get_timeline_dir(&conf, &ttid); + create_dir_all(&timeline_dir).await?; + + let mut pstate = TimelinePersistentState::empty(); + pstate.tenant_id = ttid.tenant_id; + pstate.timeline_id = ttid.timeline_id; + + let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?; + let ctrl = + control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?; + let state = TimelineState::new(ctrl); + let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?; + + // Emulate an initial election. + safekeeper + .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected { + term: 1, + start_streaming_at: Lsn(0), + term_history: TermHistory(vec![(1, Lsn(0)).into()]), + timeline_start_lsn: Lsn(0), + })) + .await?; + + Ok(safekeeper) + } + + /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its + /// manager task. + pub async fn make_timeline( + &self, + node_id: NodeId, + ttid: TenantTimelineId, + ) -> anyhow::Result> { + let conf = self.make_conf(node_id); + let timeline_dir = get_timeline_dir(&conf, &ttid); + let remote_path = remote_timeline_path(&ttid)?; + + let safekeeper = self.make_safekeeper(node_id, ttid).await?; + let shared_state = SharedState::new(StateSK::Loaded(safekeeper)); + + let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state); + timeline.bootstrap( + &mut timeline.write_shared_state().await, + &conf, + Arc::new(TimelinesSet::default()), // ignored for now + RateLimiter::new(0, 0), + ); + Ok(timeline) + } +} diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs new file mode 100644 index 0000000000..e32d7526ca --- /dev/null +++ b/safekeeper/benches/receive_wal.rs @@ -0,0 +1,341 @@ +//! WAL ingestion benchmarks. + +#[path = "benchutils.rs"] +mod benchutils; + +use std::io::Write as _; + +use benchutils::Env; +use camino_tempfile::tempfile; +use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion}; +use itertools::Itertools as _; +use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator}; +use safekeeper::receive_wal::{self, WalAcceptor}; +use safekeeper::safekeeper::{ + AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, +}; +use tokio::io::AsyncWriteExt as _; +use utils::id::{NodeId, TenantTimelineId}; +use utils::lsn::Lsn; + +const KB: usize = 1024; +const MB: usize = 1024 * KB; +const GB: usize = 1024 * MB; + +// Register benchmarks with Criterion. +criterion_group!( + benches, + bench_process_msg, + bench_wal_acceptor, + bench_wal_acceptor_throughput, + bench_file_write +); +criterion_main!(benches); + +/// Benchmarks SafeKeeper::process_msg() as time per message and throughput. Each message is an +/// AppendRequest with a single WAL record containing an XlLogicalMessage of varying size. When +/// measuring throughput, only the logical message payload is considered, excluding +/// segment/page/record headers. +fn bench_process_msg(c: &mut Criterion) { + let mut g = c.benchmark_group("process_msg"); + for fsync in [false, true] { + for commit in [false, true] { + for size in [8, KB, 8 * KB, 128 * KB, MB] { + // Kind of weird to change the group throughput per benchmark, but it's the only way + // to vary it per benchmark. It works. + g.throughput(criterion::Throughput::Bytes(size as u64)); + g.bench_function(format!("fsync={fsync}/commit={commit}/size={size}"), |b| { + run_bench(b, size, fsync, commit).unwrap() + }); + } + } + } + + // The actual benchmark. If commit is true, advance the commit LSN on every message. + fn run_bench(b: &mut Bencher, size: usize, fsync: bool, commit: bool) -> anyhow::Result<()> { + let runtime = tokio::runtime::Builder::new_current_thread() // single is fine, sync IO only + .enable_all() + .build()?; + + // Construct the payload. The prefix counts towards the payload (including NUL terminator). + let prefix = c"p"; + let prefixlen = prefix.to_bytes_with_nul().len(); + assert!(size >= prefixlen); + let message = vec![0; size - prefixlen]; + + let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message)); + + // Set up the Safekeeper. + let env = Env::new(fsync)?; + let mut safekeeper = + runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?; + + b.iter_batched_ref( + // Pre-construct WAL records and requests. Criterion will batch them. + || { + let (lsn, record) = walgen.next().expect("endless WAL"); + ProposerAcceptorMessage::AppendRequest(AppendRequest { + h: AppendRequestHeader { + term: 1, + term_start_lsn: Lsn(0), + begin_lsn: lsn, + end_lsn: lsn + record.len() as u64, + commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record + truncate_lsn: Lsn(0), + proposer_uuid: [0; 16], + }, + wal_data: record, + }) + }, + // Benchmark message processing (time per message). + |msg| { + runtime + .block_on(safekeeper.process_msg(msg)) + .expect("message failed") + }, + BatchSize::SmallInput, // automatically determine a batch size + ); + Ok(()) + } +} + +/// Benchmarks WalAcceptor message processing time by sending it a batch of WAL records and waiting +/// for it to confirm that the last LSN has been flushed to storage. We pipeline a bunch of messages +/// instead of measuring each individual message to amortize costs (e.g. fsync), which is more +/// realistic. Records are XlLogicalMessage with a tiny payload (~64 bytes per record including +/// headers). Records are pre-constructed to avoid skewing the benchmark. +/// +/// TODO: add benchmarks with in-memory storage, see comment on `Env::make_safekeeper()`: +fn bench_wal_acceptor(c: &mut Criterion) { + let mut g = c.benchmark_group("wal_acceptor"); + for fsync in [false, true] { + for n in [1, 100, 10000] { + g.bench_function(format!("fsync={fsync}/n={n}"), |b| { + run_bench(b, n, fsync).unwrap() + }); + } + } + + /// The actual benchmark. n is the number of WAL records to send in a pipelined batch. + fn run_bench(b: &mut Bencher, n: usize, fsync: bool) -> anyhow::Result<()> { + let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded + + let env = Env::new(fsync)?; + let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message")); + + // Create buffered channels that can fit all requests, to avoid blocking on channels. + let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n); + let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(n); + + // Spawn the WalAcceptor task. + runtime.block_on(async { + // TODO: WalAcceptor doesn't actually need a full timeline, only + // Safekeeper::process_msg(). Consider decoupling them to simplify the setup. + let tli = env + .make_timeline(NodeId(1), TenantTimelineId::generate()) + .await? + .wal_residence_guard() + .await?; + WalAcceptor::spawn(tli, msg_rx, reply_tx, Some(0)); + anyhow::Ok(()) + })?; + + b.iter_batched( + // Pre-construct a batch of WAL records and requests. + || { + walgen + .take(n) + .map(|(lsn, record)| AppendRequest { + h: AppendRequestHeader { + term: 1, + term_start_lsn: Lsn(0), + begin_lsn: lsn, + end_lsn: lsn + record.len() as u64, + commit_lsn: Lsn(0), + truncate_lsn: Lsn(0), + proposer_uuid: [0; 16], + }, + wal_data: record, + }) + .collect_vec() + }, + // Benchmark batch ingestion (time per batch). + |reqs| { + runtime.block_on(async { + let final_lsn = reqs.last().unwrap().h.end_lsn; + // Stuff all the messages into the buffered channel to pipeline them. + for req in reqs { + let msg = ProposerAcceptorMessage::AppendRequest(req); + msg_tx.send(msg).await.expect("send failed"); + } + // Wait for the last message to get flushed. + while let Some(reply) = reply_rx.recv().await { + if let AcceptorProposerMessage::AppendResponse(resp) = reply { + if resp.flush_lsn >= final_lsn { + return; + } + } + } + panic!("disconnected") + }) + }, + BatchSize::PerIteration, // only run one request batch at a time + ); + Ok(()) + } +} + +/// Benchmarks WalAcceptor throughput by sending 1 GB of data with varying message sizes and waiting +/// for the last LSN to be flushed to storage. Only the actual message payload counts towards +/// throughput, headers are excluded and considered overhead. Records are XlLogicalMessage. +/// +/// To avoid running out of memory, messages are constructed during the benchmark. +fn bench_wal_acceptor_throughput(c: &mut Criterion) { + const VOLUME: usize = GB; // NB: excludes message/page/segment headers and padding + + let mut g = c.benchmark_group("wal_acceptor_throughput"); + g.sample_size(10); + g.throughput(criterion::Throughput::Bytes(VOLUME as u64)); + + for fsync in [false, true] { + for commit in [false, true] { + for size in [KB, 8 * KB, 128 * KB, MB] { + assert_eq!(VOLUME % size, 0, "volume must be divisible by size"); + let count = VOLUME / size; + g.bench_function(format!("fsync={fsync}/commit={commit}/size={size}"), |b| { + run_bench(b, count, size, fsync, commit).unwrap() + }); + } + } + } + + /// The actual benchmark. size is the payload size per message, count is the number of messages. + /// If commit is true, advance the commit LSN on each message. + fn run_bench( + b: &mut Bencher, + count: usize, + size: usize, + fsync: bool, + commit: bool, + ) -> anyhow::Result<()> { + let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded + + // Construct the payload. The prefix counts towards the payload (including NUL terminator). + let prefix = c"p"; + let prefixlen = prefix.to_bytes_with_nul().len(); + assert!(size >= prefixlen); + let message = vec![0; size - prefixlen]; + + let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message)); + + // Construct and spawn the WalAcceptor task. + let env = Env::new(fsync)?; + + let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE); + let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE); + + runtime.block_on(async { + let tli = env + .make_timeline(NodeId(1), TenantTimelineId::generate()) + .await? + .wal_residence_guard() + .await?; + WalAcceptor::spawn(tli, msg_rx, reply_tx, Some(0)); + anyhow::Ok(()) + })?; + + // Ingest the WAL. + b.iter(|| { + runtime.block_on(async { + let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest { + h: AppendRequestHeader { + term: 1, + term_start_lsn: Lsn(0), + begin_lsn: lsn, + end_lsn: lsn + record.len() as u64, + commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record + truncate_lsn: Lsn(0), + proposer_uuid: [0; 16], + }, + wal_data: record, + }); + + // Send requests. + for req in reqgen { + _ = reply_rx.try_recv(); // discard any replies, to avoid blocking + let msg = ProposerAcceptorMessage::AppendRequest(req); + msg_tx.send(msg).await.expect("send failed"); + } + + // Wait for last message to get flushed. + while let Some(reply) = reply_rx.recv().await { + if let AcceptorProposerMessage::AppendResponse(resp) = reply { + if resp.flush_lsn >= walgen.lsn { + return; + } + } + } + panic!("disconnected") + }) + }); + Ok(()) + } +} + +/// Benchmarks OS write throughput by appending blocks of a given size to a file. This is intended +/// to compare Tokio and stdlib writes, and give a baseline for optimal WAL throughput. +fn bench_file_write(c: &mut Criterion) { + let mut g = c.benchmark_group("file_write"); + + for kind in ["stdlib", "tokio"] { + for fsync in [false, true] { + for size in [8, KB, 8 * KB, 128 * KB, MB] { + // Kind of weird to change the group throughput per benchmark, but it's the only way to + // vary it per benchmark. It works. + g.throughput(criterion::Throughput::Bytes(size as u64)); + g.bench_function( + format!("{kind}/fsync={fsync}/size={size}"), + |b| match kind { + "stdlib" => run_bench_stdlib(b, size, fsync).unwrap(), + "tokio" => run_bench_tokio(b, size, fsync).unwrap(), + name => panic!("unknown kind {name}"), + }, + ); + } + } + } + + fn run_bench_stdlib(b: &mut Bencher, size: usize, fsync: bool) -> anyhow::Result<()> { + let mut file = tempfile()?; + let buf = vec![0u8; size]; + + b.iter(|| { + file.write_all(&buf).unwrap(); + file.flush().unwrap(); + if fsync { + file.sync_data().unwrap(); + } + }); + + Ok(()) + } + + fn run_bench_tokio(b: &mut Bencher, size: usize, fsync: bool) -> anyhow::Result<()> { + let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded + + let mut file = tokio::fs::File::from_std(tempfile()?); + let buf = vec![0u8; size]; + + b.iter(|| { + runtime.block_on(async { + file.write_all(&buf).await.unwrap(); + file.flush().await.unwrap(); + if fsync { + file.sync_data().await.unwrap(); + } + }) + }); + + Ok(()) + } +} diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index b1cddaf062..6d68b6b59b 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -112,9 +112,7 @@ impl SafeKeeperConf { } impl SafeKeeperConf { - #[cfg(test)] - #[allow(unused)] - fn dummy() -> Self { + pub fn dummy() -> Self { SafeKeeperConf { workdir: Utf8PathBuf::from("./"), no_sync: false, diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index 0826a148ec..b8925d785e 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -138,7 +138,6 @@ impl TimelinePersistentState { }) } - #[cfg(test)] pub fn empty() -> Self { TimelinePersistentState::new( &TenantTimelineId::empty(), diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index f0113978c4..fa91241177 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -2,7 +2,7 @@ //! to glue together SafeKeeper and all other background services. use anyhow::{anyhow, bail, Result}; -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; use remote_storage::RemotePath; use safekeeper_api::models::TimelineTermBumpResponse; use serde::{Deserialize, Serialize}; @@ -325,8 +325,17 @@ pub struct SharedState { } impl SharedState { + /// Creates a new SharedState. + pub fn new(sk: StateSK) -> Self { + Self { + sk, + peers_info: PeersInfo(vec![]), + wal_removal_on_hold: false, + } + } + /// Restore SharedState from control file. If file doesn't exist, bails out. - fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result { + pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result { let timeline_dir = get_timeline_dir(conf, ttid); let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?; if control_store.server.wal_seg_size == 0 { @@ -352,11 +361,7 @@ impl SharedState { } }; - Ok(Self { - sk, - peers_info: PeersInfo(vec![]), - wal_removal_on_hold: false, - }) + Ok(Self::new(sk)) } pub(crate) fn get_wal_seg_size(&self) -> usize { @@ -480,11 +485,13 @@ pub struct Timeline { } impl Timeline { - /// Load existing timeline from disk. - pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result> { - let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); - - let shared_state = SharedState::restore(conf, &ttid)?; + /// Constructs a new timeline. + pub fn new( + ttid: TenantTimelineId, + timeline_dir: &Utf8Path, + remote_path: &RemotePath, + shared_state: SharedState, + ) -> Arc { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(shared_state.sk.state().commit_lsn); let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from(( @@ -494,10 +501,11 @@ impl Timeline { let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0); let walreceivers = WalReceivers::new(); - let remote_path = remote_timeline_path(&ttid)?; - Ok(Arc::new(Timeline { + + Arc::new(Self { ttid, - remote_path, + remote_path: remote_path.to_owned(), + timeline_dir: timeline_dir.to_owned(), commit_lsn_watch_tx, commit_lsn_watch_rx, term_flush_lsn_watch_tx, @@ -508,13 +516,28 @@ impl Timeline { walsenders: WalSenders::new(walreceivers.clone()), walreceivers, cancel: CancellationToken::default(), - timeline_dir: get_timeline_dir(conf, &ttid), manager_ctl: ManagerCtl::new(), broker_active: AtomicBool::new(false), wal_backup_active: AtomicBool::new(false), last_removed_segno: AtomicU64::new(0), mgr_status: AtomicStatus::new(), - })) + }) + } + + /// Load existing timeline from disk. + pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result> { + let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); + + let shared_state = SharedState::restore(conf, &ttid)?; + let timeline_dir = get_timeline_dir(conf, &ttid); + let remote_path = remote_timeline_path(&ttid)?; + + Ok(Timeline::new( + ttid, + &timeline_dir, + &remote_path, + shared_state, + )) } /// Initialize fresh timeline on disk and start background tasks. If init @@ -1128,13 +1151,13 @@ async fn delete_dir(path: &Utf8PathBuf) -> Result { /// Get a path to the tenant directory. If you just need to get a timeline directory, /// use WalResidentTimeline::get_timeline_dir instead. -pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf { +pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf { conf.workdir.join(tenant_id.to_string()) } /// Get a path to the timeline directory. If you need to read WAL files from disk, /// use WalResidentTimeline::get_timeline_dir instead. This function does not check /// timeline eviction status and WAL files might not be present on disk. -pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf { +pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf { get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string()) }