diff --git a/Cargo.lock b/Cargo.lock index 3f3d84465f..07de6f0662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1955,6 +1955,20 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.1", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heapless" version = "0.8.0" @@ -2916,8 +2930,13 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "hdrhistogram", + "humantime", + "humantime-serde", "pageserver", "rand 0.8.5", + "serde", + "serde_json", "tokio", "tracing", "utils", diff --git a/Cargo.toml b/Cargo.toml index c25a0dade3..6c079df2bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ futures-util = "0.3" git-version = "0.3" hashbrown = "0.13" hashlink = "0.8.1" +hdrhistogram = "7.5.2" hex = "0.4" hex-literal = "0.4" hmac = "0.12.1" diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index ea2c1ff68d..15ec28d489 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -8,7 +8,12 @@ edition = "2021" [dependencies] anyhow.workspace = true clap.workspace = true +hdrhistogram.workspace = true +humantime.workspace = true +humantime-serde.workspace = true rand.workspace = true +serde.workspace = true +serde_json.workspace = true tracing.workspace = true tokio.workspace = true diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index 4f1c2f6b6f..a8fa70090d 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -11,8 +11,7 @@ use utils::logging; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; - -use tokio::task::JoinHandle; +use std::time::{Duration, Instant}; /// Measure performance of the GetPage API, targeting the latest LSN. #[derive(clap::Parser)] @@ -41,6 +40,92 @@ impl LiveStats { } } +#[derive(serde::Serialize)] +struct Output { + per_task: Vec, + total: PerTaskOutput, +} + +const LATENCY_PERCENTILES: [f64; 3] = [99.0, 99.9, 99.99]; + +struct LatencyPercentiles { + latency_percentiles: [Duration; 3], +} + +impl serde::Serialize for LatencyPercentiles { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeMap; + let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?; + for p in LATENCY_PERCENTILES { + ser.serialize_entry( + &format!("p{p}"), + &format!( + "{}", + &humantime::format_duration(self.latency_percentiles[0]) + ), + )?; + } + ser.end() + } +} + +#[derive(serde::Serialize)] +struct PerTaskOutput { + request_count: u64, + #[serde(with = "humantime_serde")] + latency_mean: Duration, + latency_percentiles: LatencyPercentiles, +} + +struct PerTaskStats { + latency_histo: hdrhistogram::Histogram, +} + +impl PerTaskStats { + fn new() -> Self { + Self { + // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, + // which would skew the benchmark results. + latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(), + } + } + fn observe(&mut self, latency: Duration) -> anyhow::Result<()> { + let micros: u64 = latency + .as_micros() + .try_into() + .context("latency greater than u64")?; + self.latency_histo + .record(micros) + .context("add to histogram")?; + Ok(()) + } + fn output(&self) -> PerTaskOutput { + let latency_percentiles = std::array::from_fn(|idx| { + let micros = self + .latency_histo + .value_at_percentile(LATENCY_PERCENTILES[idx]); + Duration::from_micros(micros) + }); + PerTaskOutput { + request_count: self.latency_histo.len(), + latency_mean: Duration::from_micros(self.latency_histo.mean() as u64), + latency_percentiles: LatencyPercentiles { + latency_percentiles, + }, + } + } + + fn add(&mut self, other: &Self) { + let Self { + ref mut latency_histo, + } = self; + latency_histo.add(&other.latency_histo).unwrap(); + } +} + pub(crate) async fn main(args: Args) -> anyhow::Result<()> { logging::init( logging::LogFormat::Plain, @@ -92,6 +177,7 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> { let num_work_tasks = tenant_timelines.len() * args.num_tasks; let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1)); + let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks)); tokio::spawn({ let stats = Arc::clone(&stats); @@ -113,22 +199,44 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> { let mut tasks = Vec::new(); for (tenant_id, timeline_id) in tenant_timelines { - let stats = Arc::clone(&stats); + let live_stats = Arc::clone(&stats); let t = tokio::spawn(timeline( args, client.clone(), tenant_id, timeline_id, Arc::clone(&start_work_barrier), - stats, + Arc::clone(&all_work_done_barrier), + live_stats, )); - tasks.push(t); + tasks.push(((tenant_id, timeline_id), t)); } - for t in tasks { - t.await.unwrap().unwrap(); + let mut per_timeline: Vec<((TenantId, TimelineId), Vec)> = + Vec::with_capacity(tasks.len()); + for (key, task) in tasks { + per_timeline.push((key, task.await.unwrap().unwrap())); } + let per_task: Vec<(_, _)> = per_timeline + .iter() + .flat_map(|(k, task_stats)| task_stats.into_iter().map(|s| (*k, s))) + .collect(); + + let output = Output { + total: { + let mut agg_stats = PerTaskStats::new(); + for (_, stats) in &per_task { + agg_stats.add(stats); + } + agg_stats.output() + }, + per_task: per_task.iter().map(|(_, s)| s.output()).collect(), + }; + + let output = serde_json::to_string_pretty(&output).unwrap(); + println!("{output}"); + anyhow::Ok(()) } @@ -138,8 +246,9 @@ async fn timeline( tenant_id: TenantId, timeline_id: TimelineId, start_work_barrier: Arc, + all_work_done_barrier: Arc, stats: Arc, -) -> anyhow::Result<()> { +) -> anyhow::Result> { let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?; let lsn = partitioning.at_lsn; @@ -181,14 +290,16 @@ async fn timeline( let ranges = Arc::new(ranges); let weights = Arc::new(weights); - let mut tasks = Vec::>::new(); + let mut tasks = Vec::new(); for _i in 0..args.num_tasks { let ranges = ranges.clone(); let _weights = weights.clone(); let start_work_barrier = Arc::clone(&start_work_barrier); - let task = tokio::spawn({ - let stats = Arc::clone(&stats); + let all_work_done_barrier = Arc::clone(&all_work_done_barrier); + + let jh = tokio::spawn({ + let live_stats = Arc::clone(&stats); async move { let mut getpage_client = pageserver::client::page_service::Client::new( args.page_service_connstring.clone(), @@ -197,6 +308,9 @@ async fn timeline( ) .await .unwrap(); + + let mut stats = PerTaskStats::new(); + start_work_barrier.wait().await; for _i in 0..args.num_requests { let key = { @@ -208,6 +322,7 @@ async fn timeline( key_to_rel_block(key).expect("we filter non-rel-block keys out above"); RelTagBlockNo { rel_tag, block_no } }; + let start = Instant::now(); getpage_client .getpage(key, lsn) .await @@ -215,17 +330,23 @@ async fn timeline( format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) }) .unwrap(); - stats.inc(); + let elapsed = start.elapsed(); + live_stats.inc(); + stats.observe(elapsed).unwrap(); } + all_work_done_barrier.wait().await; + getpage_client.shutdown().await; + stats } }); - tasks.push(task); + tasks.push(jh); } + let mut ret = Vec::with_capacity(tasks.len()); for task in tasks { - task.await.unwrap(); + ret.push(task.await.unwrap()); } - Ok(()) + Ok(ret) } diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py index 9b9391216d..ab655efb84 100644 --- a/test_runner/performance/test_pageserver.py +++ b/test_runner/performance/test_pageserver.py @@ -1,4 +1,3 @@ - import json from pathlib import Path import shutil @@ -11,7 +10,10 @@ from fixtures.types import TenantId from fixtures.log_helper import log from fixtures.benchmark_fixture import NeonBenchmarker -def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin): + +def test_getpage_throughput( + neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin +): neon_env_builder.enable_generations = True neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start() @@ -26,15 +28,15 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon # create our template tenant tenant_config_mgmt_api = { - "gc_period" : '0s', - "checkpoint_timeout" : '3650 day', - "compaction_period" : '20 s', - "compaction_threshold" : 10, - "compaction_target_size" : 134217728, - "checkpoint_distance" : 268435456, - "image_creation_threshold" : 3, + "gc_period": "0s", + "checkpoint_timeout": "3650 day", + "compaction_period": "20 s", + "compaction_threshold": 10, + "compaction_target_size": 134217728, + "checkpoint_distance": 268435456, + "image_creation_threshold": 3, } - tenant_config_cli = { k: str(v) for k, v in tenant_config_mgmt_api.items() } + tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()} template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli) template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"]) @@ -46,18 +48,15 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon # stop PS just for good measure env.pageserver.stop() - # duplicate the tenant in remote stora + # duplicate the tenant in remote storage src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines" assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory" - tenants = [template_tenant] - for i in range(0, 10): new_tenant = TenantId.generate() tenants.append(new_tenant) log.info("Duplicating tenant #%s: %s", i, new_tenant) - dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines" dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False) dst_timelines_dir.mkdir(parents=False, exist_ok=False) @@ -86,7 +85,9 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon env.pageserver.start() assert ps_http.tenant_list() == [] for tenant in tenants: - ps_http.tenant_attach(tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen+1) + ps_http.tenant_attach( + tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1 + ) for tenant in tenants: wait_until_tenant_active(ps_http, tenant) @@ -95,20 +96,23 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon for tenant in tenants: ps_http.download_all_layers(tenant, template_timeline) - # run the benchmark + # run the benchmark with one client per timeline, each doing 10k requests to random keys. cmd = [ str(env.neon_binpath / "pagebench"), - "--mgmt-api-endpoint", ps_http.base_url, - "--page-service-connstring", env.pageserver.connstr(password=None), - "--num-tasks", "1", - "--num-requests", "10000", + "get-page-latest-lsn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--page-service-connstring", + env.pageserver.connstr(password=None), + "--num-tasks", + "1", + "--num-requests", + "10000", *[str(tenant) for tenant in tenants], ] basepath = pg_bin.run_capture(cmd) results_path = Path(basepath + ".stdout") log.info(f"Benchmark results at: {results_path}") - with open(results_path, 'r') as f: + with open(results_path, "r") as f: results = json.load(f) - -