mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Add WalAcceptor throughput benchmark
This commit is contained in:
@@ -7,15 +7,24 @@ use benchutils::Env;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
use itertools::Itertools as _;
|
||||
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
|
||||
use safekeeper::receive_wal::WalAcceptor;
|
||||
use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
};
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
const KB: usize = 1024;
|
||||
const MB: usize = 1024 * KB;
|
||||
const GB: usize = 1024 * MB;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(benches, bench_process_msg, bench_wal_acceptor);
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_process_msg,
|
||||
bench_wal_acceptor,
|
||||
bench_wal_acceptor_throughput
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
/// Benchmarks SafeKeeper::process_msg() as time per message. Each message is an AppendRequest with
|
||||
@@ -66,12 +75,12 @@ fn bench_process_msg(c: &mut Criterion) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmarks WalAcceptor by sending it a batch of WAL records and waiting for it to confirm that
|
||||
/// the last LSN has been flushed to storage. We pipeline a bunch of messages instead of measuring
|
||||
/// each individual message to amortize costs (e.g. fsync), which is more realistic. Records are
|
||||
/// XlLogicalMessage with a tiny payload.
|
||||
/// Benchmarks WalAcceptor message processing time by sending it a batch of WAL records and waiting
|
||||
/// for it to confirm that the last LSN has been flushed to storage. We pipeline a bunch of messages
|
||||
/// instead of measuring each individual message to amortize costs (e.g. fsync), which is more
|
||||
/// realistic. Records are XlLogicalMessage with a tiny payload (~64 bytes per record including
|
||||
/// headers). Records are pre-constructed to avoid skewing the benchmark.
|
||||
///
|
||||
/// TODO: add benchmarks with larger data volume, and measure throughput.
|
||||
/// TODO: add benchmarks with in-memory storage, see comment on `Env::make_safekeeper()`:
|
||||
fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
let mut g = c.benchmark_group("wal_acceptor");
|
||||
@@ -151,3 +160,91 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmarks WalAcceptor throughput by sending 1 GB of data with varying message sizes and waiting
|
||||
/// for the last LSN to be flushed to storage. Only the actual message payload counts towards
|
||||
/// throughput, headers are excluded and considered overhead. Records are XlLogicalMessage.
|
||||
///
|
||||
/// To avoid running out of memory, messages are constructed during the benchmark.
|
||||
fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
const VOLUME: usize = GB; // NB: excludes message/page/segment headers and padding
|
||||
|
||||
let mut g = c.benchmark_group("wal_acceptor_throughput");
|
||||
g.sample_size(10);
|
||||
g.throughput(criterion::Throughput::Bytes(VOLUME as u64));
|
||||
|
||||
for fsync in [false, true] {
|
||||
for size in [KB, 4 * KB, 128 * KB, MB] {
|
||||
assert_eq!(VOLUME % size, 0, "volume must be divisible by size");
|
||||
let count = VOLUME / size;
|
||||
g.bench_function(format!("fsync={fsync}/size={size}"), |b| {
|
||||
run_bench(b, count, size, fsync).unwrap()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// The actual benchmark. size is the payload size per message, count is the number of messages.
|
||||
fn run_bench(b: &mut Bencher, count: usize, size: usize, fsync: bool) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
|
||||
|
||||
// Construct the payload. The prefix counts towards the payload (including NUL terminator).
|
||||
let prefix = c"p";
|
||||
let prefixlen = prefix.to_bytes_with_nul().len();
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
|
||||
// Construct and spawn the WalAcceptor task.
|
||||
let env = Env::new(fsync)?;
|
||||
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
|
||||
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
|
||||
|
||||
runtime.block_on(async {
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
WalAcceptor::spawn(tli, msg_rx, reply_tx, Some(0));
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
|
||||
// Ingest the WAL.
|
||||
b.iter(|| {
|
||||
runtime.block_on(async {
|
||||
let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
});
|
||||
|
||||
// Send requests.
|
||||
for req in reqgen {
|
||||
_ = reply_rx.try_recv(); // discard any replies, to avoid blocking
|
||||
let msg = ProposerAcceptorMessage::AppendRequest(req);
|
||||
msg_tx.send(msg).await.expect("send failed");
|
||||
}
|
||||
|
||||
// Wait for last message to get flushed.
|
||||
while let Some(reply) = reply_rx.recv().await {
|
||||
if let AcceptorProposerMessage::AppendResponse(resp) = reply {
|
||||
if resp.flush_lsn >= walgen.lsn {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("disconnected")
|
||||
})
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user