From ccb5df93ef57d04f6c07cf20efbc29a5ac6c1f82 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 27 Apr 2022 09:26:56 -0400 Subject: [PATCH] Add multiple clients --- pageserver/src/bin/psbench.rs | 54 ++++++++++++++++------ pageserver/src/wal_metadata.rs | 3 +- test_runner/fixtures/benchmark_fixture.py | 4 +- test_runner/fixtures/zenith_fixtures.py | 4 +- test_runner/performance/test_pageserver.py | 7 ++- 5 files changed, 51 insertions(+), 21 deletions(-) diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs index ea1fe7cdd6..4e39f1f32d 100644 --- a/pageserver/src/bin/psbench.rs +++ b/pageserver/src/bin/psbench.rs @@ -4,7 +4,9 @@ //! of the tester matters, and the pagestream API is easier to call from rust. use bytes::{BufMut, BytesMut}; use clap::{Parser, Subcommand}; +use futures::future; use pageserver::wal_metadata::{Page, WalEntryMetadata}; +use rand::thread_rng; use std::fs::File; use std::path::{Path, PathBuf}; use std::time::Instant; @@ -19,6 +21,7 @@ use zenith_utils::{ lsn::Lsn, pq_proto::{BeMessage, FeMessage}, }; +use rand::seq::SliceRandom; use anyhow::Result; @@ -108,6 +111,7 @@ impl PagestreamApi { /// Parsed wal_metadata file with additional derived /// statistics for convenience. +#[derive(Clone)] struct Metadata { // Parsed from metadata file wal_metadata: Vec, @@ -146,6 +150,9 @@ impl Metadata { /// Print results in a format readable by benchmark_fixture.py fn report_latency(&self, latencies: &[Duration]) -> Result<()> { + let mut latencies: Vec<&Duration> = latencies.iter().collect(); + latencies.sort(); + println!("test_param num_pages {}", self.affected_pages.len()); println!("test_param num_wal_entries {}", self.wal_metadata.len()); println!("test_param total_wal_size {} bytes", self.total_wal_size); @@ -157,6 +164,11 @@ impl Metadata { "lower_is_better median {:?} microseconds", latencies[latencies.len() / 2].as_micros() ); + println!( + "lower_is_better average {:.3} microseconds", + (latencies.iter().map(|l| l.as_micros()).sum::() as f64) / ( + latencies.len() as f64) + ); println!( "lower_is_better p99 {:?} microseconds", latencies[latencies.len() - 1 - latencies.len() / 100].as_micros() @@ -172,47 +184,59 @@ impl Metadata { /// Sequentially get the latest version of each page and report latencies async fn test_latest_pages(api: &mut PagestreamApi, metadata: &Metadata) -> Result> { let mut latencies: Vec = vec![]; - for page in &metadata.affected_pages { + let mut page_order: Vec<&Page> = metadata.affected_pages.iter().collect(); + page_order.shuffle(&mut thread_rng()); + for page in page_order { let start = Instant::now(); let _page_bytes = api.get_page(&metadata.latest_lsn, page, true).await?; let duration = start.elapsed(); latencies.push(duration); } - latencies.sort(); Ok(latencies) } -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] struct Args { wal_metadata_path: PathBuf, + + // TODO get these from wal metadata + // TODO test multi-timeline parallel reads tenant_hex: String, timeline: String, + // TODO change to `clients_per_timeline` + #[clap(long, default_value = "1")] + num_clients: usize, + #[clap(subcommand)] test: PsbenchTest, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Debug, Clone)] enum PsbenchTest { GetLatestPages, } +async fn run_client(args: Args, metadata: Metadata) -> Vec { + let mut pagestream = PagestreamApi::connect(&args.tenant_hex, &args.timeline).await.unwrap(); + match args.test { + PsbenchTest::GetLatestPages => test_latest_pages(&mut pagestream, &metadata), + } + .await.unwrap() +} + #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); + let metadata = Metadata::build(&args.wal_metadata_path).unwrap(); - // Initialize setup - let metadata = Metadata::build(&args.wal_metadata_path)?; - let mut pagestream = PagestreamApi::connect(&args.tenant_hex, &args.timeline).await?; + // TODO explicitly spawn a thread for each? + let latencies: Vec = future::join_all((0..args.num_clients).map(|_| { + run_client(args.clone(), metadata.clone()) + })).await.into_iter().flatten().collect(); - // Run test - let latencies = match args.test { - PsbenchTest::GetLatestPages => test_latest_pages(&mut pagestream, &metadata), - } - .await?; - - // Report results - metadata.report_latency(&latencies)?; + println!("test_param num_clients {}", args.num_clients); + metadata.report_latency(&latencies).unwrap(); Ok(()) } diff --git a/pageserver/src/wal_metadata.rs b/pageserver/src/wal_metadata.rs index 199024d5e6..a298ba9612 100644 --- a/pageserver/src/wal_metadata.rs +++ b/pageserver/src/wal_metadata.rs @@ -73,7 +73,8 @@ impl From<&DecodedBkpBlock> for Page { } } -#[derive(Debug, Serialize, Deserialize)] +// TODO include tenant and timeline +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct WalEntryMetadata { pub lsn: Lsn, pub size: usize, diff --git a/test_runner/fixtures/benchmark_fixture.py b/test_runner/fixtures/benchmark_fixture.py index 8b9aabfe43..39a3783650 100644 --- a/test_runner/fixtures/benchmark_fixture.py +++ b/test_runner/fixtures/benchmark_fixture.py @@ -232,7 +232,7 @@ class ZenithBenchmarker: '', MetricReport.TEST_PARAM) - def record_psbench_result(self, psbench_output): + def record_psbench_result(self, prefix, psbench_output): """Record results from pageserver benchmarker.""" for line in psbench_output.split("\n"): tokens = line.split(" ") @@ -240,7 +240,7 @@ class ZenithBenchmarker: name = tokens[1] value = tokens[2] unit = tokens[3] if len(tokens) > 3 else "" - self.record(name, value, unit, report=report) + self.record(f"{prefix}_{name}", value, unit, report=report) def get_io_writes(self, pageserver) -> int: """ diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 0040494fa5..d52209a7e3 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1271,13 +1271,15 @@ class PsbenchBin: """A helper class for running the pageserver benchmarker tool.""" wal_metadata_path: str - def test_latest_pages(self, tenant_hex: str, timeline: str) -> str: + def test_latest_pages(self, tenant_hex: str, timeline: str, num_clients=None) -> str: + num_clients = num_clients or 1 psbench_binpath = os.path.join(str(zenith_binpath), 'psbench') args = [ psbench_binpath, self.wal_metadata_path, tenant_hex, timeline, + f"--num-clients={num_clients}", "get-latest-pages", ] return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip() diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py index 260d1002ff..41df6ca656 100644 --- a/test_runner/performance/test_pageserver.py +++ b/test_runner/performance/test_pageserver.py @@ -50,5 +50,8 @@ def test_get_page(zenith_env_builder: ZenithEnvBuilder, pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0") - output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline) - zenbenchmark.record_psbench_result(output) + output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=1) + zenbenchmark.record_psbench_result("1_client", output) + + output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=8) + zenbenchmark.record_psbench_result("8_clients", output)