diff --git a/Cargo.lock b/Cargo.lock index 90f6d69b59..a9b098a00c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2930,6 +2930,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "futures", "hdrhistogram", "humantime", "humantime-serde", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 3cb3775d7b..57b35ff6b2 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true clap.workspace = true +futures.workspace = true hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true diff --git a/pageserver/pagebench/src/basebackup.rs b/pageserver/pagebench/src/basebackup.rs new file mode 100644 index 0000000000..4bca3d68a6 --- /dev/null +++ b/pageserver/pagebench/src/basebackup.rs @@ -0,0 +1,372 @@ +use anyhow::Context; +use pageserver::client::page_service::BasebackupRequest; +use utils::lsn::Lsn; + +use rand::prelude::*; +use tokio::sync::Barrier; +use tokio::task::JoinSet; +use tracing::{info, instrument}; +use utils::id::TenantId; +use utils::logging; + +use std::cell::RefCell; +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use crate::util::tenant_timeline_id::TenantTimelineId; + +/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "postgres://postgres@localhost:64000")] + page_service_connstring: String, + #[clap(long, default_value = "1")] + num_clients: NonZeroUsize, + #[clap(long, default_value = "1.0")] + gzip_probability: f64, + #[clap(long)] + runtime: Option, + targets: Option>, +} + +#[derive(Debug, Default)] +struct LiveStats { + completed_requests: AtomicU64, +} + +impl LiveStats { + fn inc(&self) { + self.completed_requests.fetch_add(1, Ordering::Relaxed); + } +} + +#[derive(serde::Serialize)] +struct Output { + total: PerTaskOutput, +} + +const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99]; + +struct LatencyPercentiles { + latency_percentiles: [Duration; 4], +} + +impl serde::Serialize for LatencyPercentiles { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeMap; + let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?; + for p in LATENCY_PERCENTILES { + ser.serialize_entry( + &format!("p{p}"), + &format!( + "{}", + &humantime::format_duration(self.latency_percentiles[0]) + ), + )?; + } + ser.end() + } +} + +#[derive(serde::Serialize)] +struct PerTaskOutput { + request_count: u64, + #[serde(with = "humantime_serde")] + latency_mean: Duration, + latency_percentiles: LatencyPercentiles, +} + +struct ThreadLocalStats { + latency_histo: hdrhistogram::Histogram, +} + +impl ThreadLocalStats { + fn new() -> Self { + Self { + // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, + // which would skew the benchmark results. + latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(), + } + } + fn observe(&mut self, latency: Duration) -> anyhow::Result<()> { + let micros: u64 = latency + .as_micros() + .try_into() + .context("latency greater than u64")?; + self.latency_histo + .record(micros) + .context("add to histogram")?; + Ok(()) + } + fn output(&self) -> PerTaskOutput { + let latency_percentiles = std::array::from_fn(|idx| { + let micros = self + .latency_histo + .value_at_percentile(LATENCY_PERCENTILES[idx]); + Duration::from_micros(micros) + }); + PerTaskOutput { + request_count: self.latency_histo.len(), + latency_mean: Duration::from_micros(self.latency_histo.mean() as u64), + latency_percentiles: LatencyPercentiles { + latency_percentiles, + }, + } + } + + fn add(&mut self, other: &Self) { + let Self { + ref mut latency_histo, + } = self; + latency_histo.add(&other.latency_histo).unwrap(); + } +} + +thread_local! { + pub static STATS: RefCell>> = std::cell::RefCell::new( + Arc::new(Mutex::new(ThreadLocalStats::new())) + ); +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + + let thread_local_stats = Arc::new(Mutex::new(Vec::new())); + + let rt = tokio::runtime::Builder::new_multi_thread() + .on_thread_start({ + let thread_local_stats = Arc::clone(&thread_local_stats); + move || { + // pre-initialize the histograms + STATS.with(|stats| { + let stats: Arc<_> = Arc::clone(&*stats.borrow()); + thread_local_stats.lock().unwrap().push(stats); + }); + } + }) + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(args, thread_local_stats)); + rt.block_on(main_task).unwrap() +} + +struct Target { + timeline: TenantTimelineId, + timeline_lsn: Lsn, +} + +async fn main_impl( + args: Args, + thread_local_stats: Arc>>>>, +) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + )); + + // discover targets + let mut timelines: Vec = Vec::new(); + if false { + timelines = timelines.clone(); + } else { + let tenants: Vec = mgmt_api_client + .list_tenants() + .await? + .into_iter() + .map(|ti| ti.id) + .collect(); + let mut js = JoinSet::new(); + for tenant_id in tenants { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + async move { + ( + tenant_id, + mgmt_api_client.list_timelines(tenant_id).await.unwrap(), + ) + } + }); + } + while let Some(res) = js.join_next().await { + let (tenant_id, tl_infos) = res.unwrap(); + for tl in tl_infos { + timelines.push(TenantTimelineId { + tenant_id, + timeline_id: tl.timeline_id, + }); + } + } + } + + info!("timelines:\n{:?}", timelines); + + let mut js = JoinSet::new(); + for timeline in &timelines { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + let timeline = *timeline; + async move { + let partitioning = mgmt_api_client + .keyspace(timeline.tenant_id, timeline.timeline_id) + .await?; + let timeline_lsn = partitioning.at_lsn; + + anyhow::Ok(Target { + timeline, + timeline_lsn, + }) + } + }); + } + let mut all_targets: Vec = Vec::new(); + while let Some(res) = js.join_next().await { + all_targets.push(res.unwrap().unwrap()); + } + + let live_stats = Arc::new(LiveStats::default()); + + let num_client_tasks = timelines.len(); + let num_live_stats_dump = 1; + let num_work_sender_tasks = 1; + + let start_work_barrier = Arc::new(tokio::sync::Barrier::new( + num_client_tasks + num_live_stats_dump + num_work_sender_tasks, + )); + let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); + + tokio::spawn({ + let stats = Arc::clone(&live_stats); + let start_work_barrier = Arc::clone(&start_work_barrier); + async move { + start_work_barrier.wait().await; + loop { + let start = std::time::Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let elapsed = start.elapsed(); + info!( + "RPS: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64() + ); + } + } + }); + + let mut work_senders = HashMap::new(); + let mut tasks = Vec::new(); + for tl in &timelines { + let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are + work_senders.insert(tl, sender); + tasks.push(tokio::spawn(client( + args, + *tl, + Arc::clone(&start_work_barrier), + receiver, + Arc::clone(&all_work_done_barrier), + Arc::clone(&live_stats), + ))); + } + + let work_sender = async move { + start_work_barrier.wait().await; + loop { + let (target, gzip) = { + let mut rng = rand::thread_rng(); + let target = all_targets.choose(&mut rng).unwrap(); + (target, rng.gen_bool(args.gzip_probability)) + }; + let sender = work_senders.get(&target.timeline).unwrap(); + // TODO: what if this blocks? + sender.send((target.timeline_lsn, gzip)).await.ok().unwrap(); + } + }; + + if let Some(runtime) = args.runtime { + match tokio::time::timeout(runtime.into(), work_sender).await { + Ok(()) => unreachable!("work sender never terminates"), + Err(_timeout) => { + // this implicitly drops the work_senders, making all the clients exit + } + } + } else { + work_sender.await; + unreachable!("work sender never terminates"); + } + + for t in tasks { + t.await.unwrap(); + } + + let output = Output { + total: { + let mut agg_stats = ThreadLocalStats::new(); + for stats in thread_local_stats.lock().unwrap().iter() { + let stats = stats.lock().unwrap(); + agg_stats.add(&*stats); + } + agg_stats.output() + }, + }; + + let output = serde_json::to_string_pretty(&output).unwrap(); + println!("{output}"); + + anyhow::Ok(()) +} + +#[instrument(skip_all)] +async fn client( + args: &'static Args, + timeline: TenantTimelineId, + start_work_barrier: Arc, + mut work: tokio::sync::mpsc::Receiver<(Lsn, bool)>, + all_work_done_barrier: Arc, + live_stats: Arc, +) { + start_work_barrier.wait().await; + + let client = + pageserver::client::page_service::Client::new(args.page_service_connstring.clone()) + .await + .unwrap(); + + while let Some((lsn, gzip)) = work.recv().await { + let start = Instant::now(); + let copy_out_stream = client + .basebackup(&BasebackupRequest { + tenant_id: timeline.tenant_id, + timeline_id: timeline.timeline_id, + lsn: Some(lsn), + gzip, + }) + .await + .with_context(|| format!("start basebackup for {timeline}")) + .unwrap(); + + use futures::StreamExt; + copy_out_stream.for_each(|_| async move { () }).await; + let elapsed = start.elapsed(); + live_stats.inc(); + STATS.with(|stats| { + stats.borrow().lock().unwrap().observe(elapsed).unwrap(); + }); + } + + all_work_done_barrier.wait().await; +} diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 72bb38356d..1119d9f194 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -2,18 +2,21 @@ use clap::Parser; pub(crate) mod util; +mod basebackup; mod getpage_latest_lsn; /// Component-level performance test for pageserver. #[derive(clap::Parser)] enum Args { GetPageLatestLsn(getpage_latest_lsn::Args), + Basebackup(basebackup::Args), } fn main() { let args = Args::parse(); match args { Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args), + Args::Basebackup(args) => basebackup::main(args), } .unwrap() }