diff --git a/Cargo.lock b/Cargo.lock index 0e51e88e3b..0be6d5d183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,6 +2106,20 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.1", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heapless" version = "0.8.0" @@ -3057,6 +3071,28 @@ dependencies = [ "sha2", ] +[[package]] +name = "pagebench" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "futures", + "hdrhistogram", + "humantime", + "humantime-serde", + "pageserver", + "pageserver_api", + "pageserver_client", + "rand 0.8.5", + "serde", + "serde_json", + "tokio", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "pagectl" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 6884de7bf5..5de636778a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "pageserver", "pageserver/ctl", "pageserver/client", + "pageserver/pagebench", "proxy", "safekeeper", "storage_broker", @@ -79,6 +80,7 @@ futures-util = "0.3" git-version = "0.3" hashbrown = "0.13" hashlink = "0.8.1" +hdrhistogram = "7.5.2" hex = "0.4" hex-literal = "0.4" hmac = "0.12.1" diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 3668f7939d..3e4936eec4 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -81,6 +81,10 @@ impl TenantShardId { pub fn is_zero(&self) -> bool { self.shard_number == ShardNumber(0) } + + pub fn is_unsharded(&self) -> bool { + self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0) + } } /// Formatting helper diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 262dcb8a8a..b3269ae049 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -366,6 +366,49 @@ impl MonotonicCounter for RecordLsn { } } +/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s. +/// +/// This is used by the `pagebench` pageserver benchmarking tool. +pub struct LsnSampler(::Sampler); + +impl rand::distributions::uniform::SampleUniform for Lsn { + type Sampler = LsnSampler; +} + +impl rand::distributions::uniform::UniformSampler for LsnSampler { + type X = Lsn; + + fn new(low: B1, high: B2) -> Self + where + B1: rand::distributions::uniform::SampleBorrow + Sized, + B2: rand::distributions::uniform::SampleBorrow + Sized, + { + Self( + ::Sampler::new( + low.borrow().0, + high.borrow().0, + ), + ) + } + + fn new_inclusive(low: B1, high: B2) -> Self + where + B1: rand::distributions::uniform::SampleBorrow + Sized, + B2: rand::distributions::uniform::SampleBorrow + Sized, + { + Self( + ::Sampler::new_inclusive( + low.borrow().0, + high.borrow().0, + ), + ) + } + + fn sample(&self, rng: &mut R) -> Self::X { + Lsn(self.0.sample(rng)) + } +} + #[cfg(test)] mod tests { use crate::bin_ser::BeSer; diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 0ad4e1551e..87e4ed8efd 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -5,6 +5,8 @@ use utils::{ id::{TenantId, TimelineId}, }; +pub mod util; + #[derive(Debug)] pub struct Client { mgmt_api_endpoint: String, diff --git a/pageserver/client/src/mgmt_api/util.rs b/pageserver/client/src/mgmt_api/util.rs new file mode 100644 index 0000000000..048a3bb7cd --- /dev/null +++ b/pageserver/client/src/mgmt_api/util.rs @@ -0,0 +1,49 @@ +//! Helpers to do common higher-level tasks with the [`Client`]. + +use std::sync::Arc; + +use tokio::task::JoinSet; +use utils::id::{TenantId, TenantTimelineId}; + +use super::Client; + +/// Retrieve a list of all of the pageserver's timelines. +/// +/// Fails if there are sharded tenants present on the pageserver. +pub async fn get_pageserver_tenant_timelines_unsharded( + api_client: &Arc, +) -> anyhow::Result> { + let mut timelines: Vec = Vec::new(); + let mut tenants: Vec = Vec::new(); + for ti in api_client.list_tenants().await? { + if !ti.id.is_unsharded() { + anyhow::bail!( + "only unsharded tenants are supported at this time: {}", + ti.id + ); + } + tenants.push(ti.id.tenant_id) + } + let mut js = JoinSet::new(); + for tenant_id in tenants { + js.spawn({ + let mgmt_api_client = Arc::clone(api_client); + async move { + ( + tenant_id, + mgmt_api_client.tenant_details(tenant_id).await.unwrap(), + ) + } + }); + } + while let Some(res) = js.join_next().await { + let (tenant_id, details) = res.unwrap(); + for timeline_id in details.timelines { + timelines.push(TenantTimelineId { + tenant_id, + timeline_id, + }); + } + } + Ok(timelines) +} diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml new file mode 100644 index 0000000000..169d9b7f8e --- /dev/null +++ b/pageserver/pagebench/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "pagebench" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +clap.workspace = true +futures.workspace = true +hdrhistogram.workspace = true +humantime.workspace = true +humantime-serde.workspace = true +rand.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tokio.workspace = true + +pageserver = { path = ".." } +pageserver_client.workspace = true +pageserver_api.workspace = true +utils = { path = "../../libs/utils/" } +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs new file mode 100644 index 0000000000..85a3e695de --- /dev/null +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -0,0 +1,272 @@ +use anyhow::Context; +use pageserver_client::page_service::BasebackupRequest; + +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; + +use rand::prelude::*; +use tokio::sync::Barrier; +use tokio::task::JoinSet; +use tracing::{debug, info, instrument}; + +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::ops::Range; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use crate::util::tokio_thread_local_stats::AllThreadLocalStats; +use crate::util::{request_stats, tokio_thread_local_stats}; + +/// basebackup@LatestLSN +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "localhost:64000")] + page_service_host_port: String, + #[clap(long)] + pageserver_jwt: Option, + #[clap(long, default_value = "1")] + num_clients: NonZeroUsize, + #[clap(long, default_value = "1.0")] + gzip_probability: f64, + #[clap(long)] + runtime: Option, + #[clap(long)] + limit_to_first_n_targets: Option, + targets: Option>, +} + +#[derive(Debug, Default)] +struct LiveStats { + completed_requests: AtomicU64, +} + +impl LiveStats { + fn inc(&self) { + self.completed_requests.fetch_add(1, Ordering::Relaxed); + } +} + +struct Target { + timeline: TenantTimelineId, + lsn_range: Option>, +} + +#[derive(serde::Serialize)] +struct Output { + total: request_stats::Output, +} + +tokio_thread_local_stats::declare!(STATS: request_stats::Stats); + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + tokio_thread_local_stats::main!(STATS, move |thread_local_stats| { + main_impl(args, thread_local_stats) + }) +} + +async fn main_impl( + args: Args, + all_thread_local_stats: AllThreadLocalStats, +) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + args.pageserver_jwt.as_deref(), + )); + + // discover targets + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; + let mut js = JoinSet::new(); + for timeline in &timelines { + js.spawn({ + let timeline = *timeline; + // FIXME: this triggers initial logical size calculation + // https://github.com/neondatabase/neon/issues/6168 + let info = mgmt_api_client + .timeline_info(timeline.tenant_id, timeline.timeline_id) + .await + .unwrap(); + async move { + anyhow::Ok(Target { + timeline, + // TODO: support lsn_range != latest LSN + lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)), + }) + } + }); + } + let mut all_targets: Vec = Vec::new(); + while let Some(res) = js.join_next().await { + all_targets.push(res.unwrap().unwrap()); + } + + let live_stats = Arc::new(LiveStats::default()); + + let num_client_tasks = timelines.len(); + let num_live_stats_dump = 1; + let num_work_sender_tasks = 1; + + let start_work_barrier = Arc::new(tokio::sync::Barrier::new( + num_client_tasks + num_live_stats_dump + num_work_sender_tasks, + )); + let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); + + tokio::spawn({ + let stats = Arc::clone(&live_stats); + let start_work_barrier = Arc::clone(&start_work_barrier); + async move { + start_work_barrier.wait().await; + loop { + let start = std::time::Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let elapsed = start.elapsed(); + info!( + "RPS: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64() + ); + } + } + }); + + let mut work_senders = HashMap::new(); + let mut tasks = Vec::new(); + for tl in &timelines { + let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are + work_senders.insert(tl, sender); + tasks.push(tokio::spawn(client( + args, + *tl, + Arc::clone(&start_work_barrier), + receiver, + Arc::clone(&all_work_done_barrier), + Arc::clone(&live_stats), + ))); + } + + let work_sender = async move { + start_work_barrier.wait().await; + loop { + let (timeline, work) = { + let mut rng = rand::thread_rng(); + let target = all_targets.choose(&mut rng).unwrap(); + let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r)); + ( + target.timeline, + Work { + lsn, + gzip: rng.gen_bool(args.gzip_probability), + }, + ) + }; + let sender = work_senders.get(&timeline).unwrap(); + // TODO: what if this blocks? + sender.send(work).await.ok().unwrap(); + } + }; + + if let Some(runtime) = args.runtime { + match tokio::time::timeout(runtime.into(), work_sender).await { + Ok(()) => unreachable!("work sender never terminates"), + Err(_timeout) => { + // this implicitly drops the work_senders, making all the clients exit + } + } + } else { + work_sender.await; + unreachable!("work sender never terminates"); + } + + for t in tasks { + t.await.unwrap(); + } + + let output = Output { + total: { + let mut agg_stats = request_stats::Stats::new(); + for stats in all_thread_local_stats.lock().unwrap().iter() { + let stats = stats.lock().unwrap(); + agg_stats.add(&stats); + } + agg_stats.output() + }, + }; + + let output = serde_json::to_string_pretty(&output).unwrap(); + println!("{output}"); + + anyhow::Ok(()) +} + +#[derive(Copy, Clone)] +struct Work { + lsn: Option, + gzip: bool, +} + +#[instrument(skip_all)] +async fn client( + args: &'static Args, + timeline: TenantTimelineId, + start_work_barrier: Arc, + mut work: tokio::sync::mpsc::Receiver, + all_work_done_barrier: Arc, + live_stats: Arc, +) { + start_work_barrier.wait().await; + + let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring( + &args.page_service_host_port, + args.pageserver_jwt.as_deref(), + )) + .await + .unwrap(); + + while let Some(Work { lsn, gzip }) = work.recv().await { + let start = Instant::now(); + let copy_out_stream = client + .basebackup(&BasebackupRequest { + tenant_id: timeline.tenant_id, + timeline_id: timeline.timeline_id, + lsn, + gzip, + }) + .await + .with_context(|| format!("start basebackup for {timeline}")) + .unwrap(); + + use futures::StreamExt; + let size = Arc::new(AtomicUsize::new(0)); + copy_out_stream + .for_each({ + |r| { + let size = Arc::clone(&size); + async move { + let size = Arc::clone(&size); + size.fetch_add(r.unwrap().len(), Ordering::Relaxed); + } + } + }) + .await; + debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed)); + let elapsed = start.elapsed(); + live_stats.inc(); + STATS.with(|stats| { + stats.borrow().lock().unwrap().observe(elapsed).unwrap(); + }); + } + + all_work_done_barrier.wait().await; +} diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs new file mode 100644 index 0000000000..16d198ab0e --- /dev/null +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -0,0 +1,335 @@ +use anyhow::Context; +use futures::future::join_all; +use pageserver::pgdatadir_mapping::key_to_rel_block; +use pageserver::repository; +use pageserver_api::key::is_rel_block_key; +use pageserver_client::page_service::RelTagBlockNo; + +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; + +use rand::prelude::*; +use tokio::sync::Barrier; +use tokio::task::JoinSet; +use tracing::{info, instrument}; + +use std::collections::HashMap; +use std::future::Future; +use std::num::NonZeroUsize; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use crate::util::tokio_thread_local_stats::AllThreadLocalStats; +use crate::util::{request_stats, tokio_thread_local_stats}; + +/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "postgres://postgres@localhost:64000")] + page_service_connstring: String, + #[clap(long)] + pageserver_jwt: Option, + #[clap(long, default_value = "1")] + num_clients: NonZeroUsize, + #[clap(long)] + runtime: Option, + #[clap(long)] + per_target_rate_limit: Option, + #[clap(long)] + limit_to_first_n_targets: Option, + targets: Option>, +} + +#[derive(Debug, Default)] +struct LiveStats { + completed_requests: AtomicU64, +} + +impl LiveStats { + fn inc(&self) { + self.completed_requests.fetch_add(1, Ordering::Relaxed); + } +} + +#[derive(Clone)] +struct KeyRange { + timeline: TenantTimelineId, + timeline_lsn: Lsn, + start: i128, + end: i128, +} + +impl KeyRange { + fn len(&self) -> i128 { + self.end - self.start + } +} + +#[derive(serde::Serialize)] +struct Output { + total: request_stats::Output, +} + +tokio_thread_local_stats::declare!(STATS: request_stats::Stats); + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + tokio_thread_local_stats::main!(STATS, move |thread_local_stats| { + main_impl(args, thread_local_stats) + }) +} + +async fn main_impl( + args: Args, + all_thread_local_stats: AllThreadLocalStats, +) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + args.pageserver_jwt.as_deref(), + )); + + // discover targets + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; + + let mut js = JoinSet::new(); + for timeline in &timelines { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + let timeline = *timeline; + async move { + let partitioning = mgmt_api_client + .keyspace(timeline.tenant_id, timeline.timeline_id) + .await?; + let lsn = partitioning.at_lsn; + + let ranges = partitioning + .keys + .ranges + .iter() + .filter_map(|r| { + let start = r.start; + let end = r.end; + // filter out non-relblock keys + match (is_rel_block_key(&start), is_rel_block_key(&end)) { + (true, true) => Some(KeyRange { + timeline, + timeline_lsn: lsn, + start: start.to_i128(), + end: end.to_i128(), + }), + (true, false) | (false, true) => { + unimplemented!("split up range") + } + (false, false) => None, + } + }) + .collect::>(); + + anyhow::Ok(ranges) + } + }); + } + let mut all_ranges: Vec = Vec::new(); + while let Some(res) = js.join_next().await { + all_ranges.extend(res.unwrap().unwrap()); + } + + let live_stats = Arc::new(LiveStats::default()); + + let num_client_tasks = timelines.len(); + let num_live_stats_dump = 1; + let num_work_sender_tasks = 1; + + let start_work_barrier = Arc::new(tokio::sync::Barrier::new( + num_client_tasks + num_live_stats_dump + num_work_sender_tasks, + )); + let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); + + tokio::spawn({ + let stats = Arc::clone(&live_stats); + let start_work_barrier = Arc::clone(&start_work_barrier); + async move { + start_work_barrier.wait().await; + loop { + let start = std::time::Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let elapsed = start.elapsed(); + info!( + "RPS: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64() + ); + } + } + }); + + let mut work_senders = HashMap::new(); + let mut tasks = Vec::new(); + for tl in &timelines { + let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are + work_senders.insert(tl, sender); + tasks.push(tokio::spawn(client( + args, + *tl, + Arc::clone(&start_work_barrier), + receiver, + Arc::clone(&all_work_done_barrier), + Arc::clone(&live_stats), + ))); + } + + let work_sender: Pin>> = match args.per_target_rate_limit { + None => Box::pin(async move { + let weights = rand::distributions::weighted::WeightedIndex::new( + all_ranges.iter().map(|v| v.len()), + ) + .unwrap(); + + start_work_barrier.wait().await; + + loop { + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = &all_ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; + let sender = work_senders.get(&range.timeline).unwrap(); + // TODO: what if this blocks? + sender.send((key, range.timeline_lsn)).await.ok().unwrap(); + } + }), + Some(rps_limit) => Box::pin(async move { + let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); + + let make_timeline_task: &dyn Fn( + TenantTimelineId, + ) + -> Pin>> = &|timeline| { + let sender = work_senders.get(&timeline).unwrap(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == timeline) + .cloned() + .collect(); + let weights = rand::distributions::weighted::WeightedIndex::new( + ranges.iter().map(|v| v.len()), + ) + .unwrap(); + + Box::pin(async move { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior( + /* TODO review this choice */ + tokio::time::MissedTickBehavior::Burst, + ); + loop { + ticker.tick().await; + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; + sender.send((key, range.timeline_lsn)).await.ok().unwrap(); + } + }) + }; + + let tasks: Vec<_> = work_senders + .keys() + .map(|tl| make_timeline_task(**tl)) + .collect(); + + start_work_barrier.wait().await; + + join_all(tasks).await; + }), + }; + + if let Some(runtime) = args.runtime { + match tokio::time::timeout(runtime.into(), work_sender).await { + Ok(()) => unreachable!("work sender never terminates"), + Err(_timeout) => { + // this implicitly drops the work_senders, making all the clients exit + } + } + } else { + work_sender.await; + unreachable!("work sender never terminates"); + } + + for t in tasks { + t.await.unwrap(); + } + + let output = Output { + total: { + let mut agg_stats = request_stats::Stats::new(); + for stats in all_thread_local_stats.lock().unwrap().iter() { + let stats = stats.lock().unwrap(); + agg_stats.add(&stats); + } + agg_stats.output() + }, + }; + + let output = serde_json::to_string_pretty(&output).unwrap(); + println!("{output}"); + + anyhow::Ok(()) +} + +#[instrument(skip_all)] +async fn client( + args: &'static Args, + timeline: TenantTimelineId, + start_work_barrier: Arc, + mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>, + all_work_done_barrier: Arc, + live_stats: Arc, +) { + start_work_barrier.wait().await; + + let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) + .await + .unwrap(); + let mut client = client + .pagestream(timeline.tenant_id, timeline.timeline_id) + .await + .unwrap(); + + while let Some((key, lsn)) = work.recv().await { + let start = Instant::now(); + client + .getpage(key, lsn) + .await + .with_context(|| format!("getpage for {timeline}")) + .unwrap(); + let elapsed = start.elapsed(); + live_stats.inc(); + STATS.with(|stats| { + stats.borrow().lock().unwrap().observe(elapsed).unwrap(); + }); + } + + all_work_done_barrier.wait().await; +} diff --git a/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs new file mode 100644 index 0000000000..d46ae94e8a --- /dev/null +++ b/pageserver/pagebench/src/cmd/trigger_initial_size_calculation.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use humantime::Duration; +use tokio::task::JoinSet; +use utils::id::TenantTimelineId; + +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "localhost:64000")] + page_service_host_port: String, + #[clap(long)] + pageserver_jwt: Option, + #[clap( + long, + help = "if specified, poll mgmt api to check whether init logical size calculation has completed" + )] + poll_for_completion: Option, + #[clap(long)] + limit_to_first_n_targets: Option, + targets: Option>, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(args)); + rt.block_on(main_task).unwrap() +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + args.pageserver_jwt.as_deref(), + )); + + // discover targets + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; + + // kick it off + + let mut js = JoinSet::new(); + for tl in timelines { + let mgmt_api_client = Arc::clone(&mgmt_api_client); + js.spawn(async move { + // TODO: API to explicitly trigger initial logical size computation. + // Should probably also avoid making it a side effect of timeline details to trigger initial logical size calculation. + // => https://github.com/neondatabase/neon/issues/6168 + let info = mgmt_api_client + .timeline_info(tl.tenant_id, tl.timeline_id) + .await + .unwrap(); + + if let Some(period) = args.poll_for_completion { + let mut ticker = tokio::time::interval(period.into()); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut info = info; + while !info.current_logical_size_is_accurate { + ticker.tick().await; + info = mgmt_api_client + .timeline_info(tl.tenant_id, tl.timeline_id) + .await + .unwrap(); + } + } + }); + } + while let Some(res) = js.join_next().await { + let _: () = res.unwrap(); + } + Ok(()) +} diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs new file mode 100644 index 0000000000..e0120c9212 --- /dev/null +++ b/pageserver/pagebench/src/main.rs @@ -0,0 +1,48 @@ +use clap::Parser; +use utils::logging; + +/// Re-usable pieces of code that aren't CLI-specific. +mod util { + pub(crate) mod connstring; + pub(crate) mod request_stats; + #[macro_use] + pub(crate) mod tokio_thread_local_stats; + /// Re-usable pieces of CLI-specific code. + pub(crate) mod cli { + pub(crate) mod targets; + } +} + +/// The pagebench CLI sub-commands, dispatched in [`main`] below. +mod cmd { + pub(super) mod basebackup; + pub(super) mod getpage_latest_lsn; + pub(super) mod trigger_initial_size_calculation; +} + +/// Component-level performance test for pageserver. +#[derive(clap::Parser)] +enum Args { + Basebackup(cmd::basebackup::Args), + GetPageLatestLsn(cmd::getpage_latest_lsn::Args), + TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args), +} + +fn main() { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + + let args = Args::parse(); + match args { + Args::Basebackup(args) => cmd::basebackup::main(args), + Args::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args), + Args::TriggerInitialSizeCalculation(args) => { + cmd::trigger_initial_size_calculation::main(args) + } + } + .unwrap() +} diff --git a/pageserver/pagebench/src/util/cli/targets.rs b/pageserver/pagebench/src/util/cli/targets.rs new file mode 100644 index 0000000000..848eae27cf --- /dev/null +++ b/pageserver/pagebench/src/util/cli/targets.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use pageserver_client::mgmt_api; +use tracing::info; +use utils::id::TenantTimelineId; + +pub(crate) struct Spec { + pub(crate) limit_to_first_n_targets: Option, + pub(crate) targets: Option>, +} + +pub(crate) async fn discover( + api_client: &Arc, + spec: Spec, +) -> anyhow::Result> { + let mut timelines = if let Some(targets) = spec.targets { + targets + } else { + mgmt_api::util::get_pageserver_tenant_timelines_unsharded(api_client).await? + }; + + if let Some(limit) = spec.limit_to_first_n_targets { + timelines.sort(); // for determinism + timelines.truncate(limit); + if timelines.len() < limit { + anyhow::bail!("pageserver has less than limit_to_first_n_targets={limit} tenants"); + } + } + + info!("timelines:\n{:?}", timelines); + info!("number of timelines:\n{:?}", timelines.len()); + + Ok(timelines) +} diff --git a/pageserver/pagebench/src/util/connstring.rs b/pageserver/pagebench/src/util/connstring.rs new file mode 100644 index 0000000000..07a0ff042d --- /dev/null +++ b/pageserver/pagebench/src/util/connstring.rs @@ -0,0 +1,8 @@ +pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String { + let colon_and_jwt = if let Some(jwt) = jwt { + format!(":{jwt}") // TODO: urlescape + } else { + String::new() + }; + format!("postgres://postgres{colon_and_jwt}@{host_port}") +} diff --git a/pageserver/pagebench/src/util/request_stats.rs b/pageserver/pagebench/src/util/request_stats.rs new file mode 100644 index 0000000000..5ecf1cbf24 --- /dev/null +++ b/pageserver/pagebench/src/util/request_stats.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use anyhow::Context; + +pub(crate) struct Stats { + latency_histo: hdrhistogram::Histogram, +} + +impl Stats { + pub(crate) fn new() -> Self { + Self { + // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, + // which would skew the benchmark results. + latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(), + } + } + pub(crate) fn observe(&mut self, latency: Duration) -> anyhow::Result<()> { + let micros: u64 = latency + .as_micros() + .try_into() + .context("latency greater than u64")?; + self.latency_histo + .record(micros) + .context("add to histogram")?; + Ok(()) + } + pub(crate) fn output(&self) -> Output { + let latency_percentiles = std::array::from_fn(|idx| { + let micros = self + .latency_histo + .value_at_percentile(LATENCY_PERCENTILES[idx]); + Duration::from_micros(micros) + }); + Output { + request_count: self.latency_histo.len(), + latency_mean: Duration::from_micros(self.latency_histo.mean() as u64), + latency_percentiles: LatencyPercentiles { + latency_percentiles, + }, + } + } + pub(crate) fn add(&mut self, other: &Self) { + let Self { + ref mut latency_histo, + } = self; + latency_histo.add(&other.latency_histo).unwrap(); + } +} + +impl Default for Stats { + fn default() -> Self { + Self::new() + } +} + +const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99]; + +struct LatencyPercentiles { + latency_percentiles: [Duration; 4], +} + +impl serde::Serialize for LatencyPercentiles { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeMap; + let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?; + for p in LATENCY_PERCENTILES { + ser.serialize_entry( + &format!("p{p}"), + &format!( + "{}", + &humantime::format_duration(self.latency_percentiles[0]) + ), + )?; + } + ser.end() + } +} + +#[derive(serde::Serialize)] +pub(crate) struct Output { + request_count: u64, + #[serde(with = "humantime_serde")] + latency_mean: Duration, + latency_percentiles: LatencyPercentiles, +} diff --git a/pageserver/pagebench/src/util/tokio_thread_local_stats.rs b/pageserver/pagebench/src/util/tokio_thread_local_stats.rs new file mode 100644 index 0000000000..82526213b6 --- /dev/null +++ b/pageserver/pagebench/src/util/tokio_thread_local_stats.rs @@ -0,0 +1,45 @@ +pub(crate) type ThreadLocalStats = Arc>; +pub(crate) type AllThreadLocalStats = Arc>>>; + +macro_rules! declare { + ($THREAD_LOCAL_NAME:ident: $T:ty) => { + thread_local! { + pub static $THREAD_LOCAL_NAME: std::cell::RefCell> = std::cell::RefCell::new( + std::sync::Arc::new(std::sync::Mutex::new(Default::default())) + ); + } + }; +} + +use std::sync::{Arc, Mutex}; + +pub(crate) use declare; + +macro_rules! main { + ($THREAD_LOCAL_NAME:ident, $main_impl:expr) => {{ + let main_impl = $main_impl; + let all = Arc::new(Mutex::new(Vec::new())); + + let rt = tokio::runtime::Builder::new_multi_thread() + .on_thread_start({ + let all = Arc::clone(&all); + move || { + // pre-initialize the thread local stats by accessesing them + // (some stats like requests_stats::Stats are quite costly to initialize, + // we don't want to pay that cost during the measurement period) + $THREAD_LOCAL_NAME.with(|stats| { + let stats: Arc<_> = Arc::clone(&*stats.borrow()); + all.lock().unwrap().push(stats); + }); + } + }) + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(all)); + rt.block_on(main_task).unwrap() + }}; +} + +pub(crate) use main; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index b81037ae47..e9884a15f5 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1776,6 +1776,7 @@ pub fn is_inherited_key(key: Key) -> bool { key != AUX_FILES_KEY } +/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`. pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { Ok(match key.field1 { 0x00 => ( @@ -1790,7 +1791,6 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { _ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1), }) } - pub fn is_rel_fsm_block_key(key: Key) -> bool { key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff }