mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
safekeeper: lift benchmarking utils into safekeeper crate (#10200)
## Problem The benchmarking utilities are also useful for testing. We want to write tests in the safekeeper crate. ## Summary of changes This commit lifts the utils to the safekeeper crate. They are compiled if the benchmarking features is enabled or if in test mode.
This commit is contained in:
@@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
|
||||
const TIMELINE_ID: u32 = 1;
|
||||
|
||||
/// Creates a new WAL generator with the given record generator.
|
||||
pub fn new(record_generator: R) -> WalGenerator<R> {
|
||||
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
|
||||
Self {
|
||||
record_generator,
|
||||
lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
lsn: start_lsn,
|
||||
prev_lsn: start_lsn,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
benchmarking = []
|
||||
|
||||
[dependencies]
|
||||
async-stream.workspace = true
|
||||
@@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
|
||||
[[bench]]
|
||||
name = "receive_wal"
|
||||
harness = false
|
||||
required-features = ["benchmarking"]
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
//! WAL ingestion benchmarks.
|
||||
|
||||
#[path = "benchutils.rs"]
|
||||
mod benchutils;
|
||||
|
||||
use std::io::Write as _;
|
||||
|
||||
use benchutils::Env;
|
||||
use bytes::BytesMut;
|
||||
use camino_tempfile::tempfile;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
@@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
};
|
||||
use safekeeper::test_utils::Env;
|
||||
use tokio::io::AsyncWriteExt as _;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
|
||||
|
||||
// Set up the Safekeeper.
|
||||
let env = Env::new(fsync)?;
|
||||
let mut safekeeper =
|
||||
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
|
||||
let mut safekeeper = runtime.block_on(env.make_safekeeper(
|
||||
NodeId(1),
|
||||
TenantTimelineId::generate(),
|
||||
Lsn(0),
|
||||
))?;
|
||||
|
||||
b.iter_batched_ref(
|
||||
// Pre-construct WAL records and requests. Criterion will batch them.
|
||||
@@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
|
||||
|
||||
let env = Env::new(fsync)?;
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
|
||||
let walgen =
|
||||
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));
|
||||
|
||||
// Create buffered channels that can fit all requests, to avoid blocking on channels.
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
|
||||
@@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
// TODO: WalAcceptor doesn't actually need a full timeline, only
|
||||
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
@@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
assert!(size >= prefixlen);
|
||||
let message = vec![0; size - prefixlen];
|
||||
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
|
||||
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
|
||||
|
||||
// Construct and spawn the WalAcceptor task.
|
||||
let env = Env::new(fsync)?;
|
||||
@@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
|
||||
runtime.block_on(async {
|
||||
let tli = env
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate())
|
||||
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
|
||||
.await?
|
||||
.wal_residence_guard()
|
||||
.await?;
|
||||
|
||||
@@ -43,6 +43,9 @@ pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
#[cfg(any(test, feature = "benchmarking"))]
|
||||
pub mod test_utils;
|
||||
|
||||
mod timelines_global_map;
|
||||
use std::sync::Arc;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use crate::state::{TimelinePersistentState, TimelineState};
|
||||
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::remote_timeline_path;
|
||||
use crate::{control_file, wal_storage, SafeKeeperConf};
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper::rate_limit::RateLimiter;
|
||||
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
|
||||
use safekeeper::state::{TimelinePersistentState, TimelineState};
|
||||
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
|
||||
use safekeeper::timelines_set::TimelinesSet;
|
||||
use safekeeper::wal_backup::remote_timeline_path;
|
||||
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
|
||||
use tokio::fs::create_dir_all;
|
||||
use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
|
||||
pub struct Env {
|
||||
/// Whether to enable fsync.
|
||||
pub fsync: bool,
|
||||
@@ -21,7 +21,7 @@ pub struct Env {
|
||||
}
|
||||
|
||||
impl Env {
|
||||
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
|
||||
/// enable fsyncing.
|
||||
pub fn new(fsync: bool) -> anyhow::Result<Self> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
@@ -47,6 +47,7 @@ impl Env {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
|
||||
let conf = self.make_conf(node_id);
|
||||
|
||||
@@ -67,9 +68,9 @@ impl Env {
|
||||
safekeeper
|
||||
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(0),
|
||||
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
|
||||
timeline_start_lsn: Lsn(0),
|
||||
start_streaming_at: start_lsn,
|
||||
term_history: TermHistory(vec![(1, start_lsn).into()]),
|
||||
timeline_start_lsn: start_lsn,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -82,12 +83,13 @@ impl Env {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
ttid: TenantTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let conf = Arc::new(self.make_conf(node_id));
|
||||
let timeline_dir = get_timeline_dir(&conf, &ttid);
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
|
||||
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
|
||||
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
|
||||
|
||||
let timeline = Timeline::new(
|
||||
@@ -18,7 +18,7 @@ impl DiskWalProposer {
|
||||
internal_available_lsn: Lsn(0),
|
||||
prev_lsn: Lsn(0),
|
||||
disk: BlockStorage::new(),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
|
||||
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user