From 7bd533dc691bb02a1e372249dcad06a4c026573a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 18 Dec 2023 17:06:40 +0000 Subject: [PATCH] pagebench: factor out the concept of thread local stats --- pageserver/pagebench/src/basebackup.rs | 150 +++--------------- .../pagebench/src/getpage_latest_lsn.rs | 143 +++-------------- pageserver/pagebench/src/main.rs | 8 + .../src/trigger_initial_size_calculation.rs | 9 -- pageserver/pagebench/src/util.rs | 3 + .../pagebench/src/util/request_stats.rs | 88 ++++++++++ .../src/util/tokio_thread_local_stats.rs | 45 ++++++ 7 files changed, 184 insertions(+), 262 deletions(-) create mode 100644 pageserver/pagebench/src/util/request_stats.rs create mode 100644 pageserver/pagebench/src/util/tokio_thread_local_stats.rs diff --git a/pageserver/pagebench/src/basebackup.rs b/pageserver/pagebench/src/basebackup.rs index b6c1080669..2aa40e2cb7 100644 --- a/pageserver/pagebench/src/basebackup.rs +++ b/pageserver/pagebench/src/basebackup.rs @@ -7,18 +7,18 @@ use rand::prelude::*; use tokio::sync::Barrier; use tokio::task::JoinSet; use tracing::{debug, info, instrument}; -use utils::logging; -use std::cell::RefCell; use std::collections::HashMap; use std::num::NonZeroUsize; use std::ops::Range; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Instant; use crate::cli; use crate::util::tenant_timeline_id::TenantTimelineId; +use crate::util::tokio_thread_local_stats::AllThreadLocalStats; +use crate::util::{request_stats, tokio_thread_local_stats}; /// basebackup@LatestLSN #[derive(clap::Parser)] @@ -51,134 +51,27 @@ impl LiveStats { } } -#[derive(serde::Serialize)] -struct Output { - total: PerTaskOutput, -} - -const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99]; - -struct LatencyPercentiles { - latency_percentiles: [Duration; 4], -} - -impl serde::Serialize for LatencyPercentiles { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - use serde::ser::SerializeMap; - let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?; - for p in LATENCY_PERCENTILES { - ser.serialize_entry( - &format!("p{p}"), - &format!( - "{}", - &humantime::format_duration(self.latency_percentiles[0]) - ), - )?; - } - ser.end() - } -} - -#[derive(serde::Serialize)] -struct PerTaskOutput { - request_count: u64, - #[serde(with = "humantime_serde")] - latency_mean: Duration, - latency_percentiles: LatencyPercentiles, -} - -struct ThreadLocalStats { - latency_histo: hdrhistogram::Histogram, -} - -impl ThreadLocalStats { - fn new() -> Self { - Self { - // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, - // which would skew the benchmark results. - latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(), - } - } - fn observe(&mut self, latency: Duration) -> anyhow::Result<()> { - let micros: u64 = latency - .as_micros() - .try_into() - .context("latency greater than u64")?; - self.latency_histo - .record(micros) - .context("add to histogram")?; - Ok(()) - } - fn output(&self) -> PerTaskOutput { - let latency_percentiles = std::array::from_fn(|idx| { - let micros = self - .latency_histo - .value_at_percentile(LATENCY_PERCENTILES[idx]); - Duration::from_micros(micros) - }); - PerTaskOutput { - request_count: self.latency_histo.len(), - latency_mean: Duration::from_micros(self.latency_histo.mean() as u64), - latency_percentiles: LatencyPercentiles { - latency_percentiles, - }, - } - } - - fn add(&mut self, other: &Self) { - let Self { - ref mut latency_histo, - } = self; - latency_histo.add(&other.latency_histo).unwrap(); - } -} - -thread_local! { - pub static STATS: RefCell>> = std::cell::RefCell::new( - Arc::new(Mutex::new(ThreadLocalStats::new())) - ); -} - -pub(crate) fn main(args: Args) -> anyhow::Result<()> { - logging::init( - logging::LogFormat::Plain, - logging::TracingErrorLayerEnablement::Disabled, - logging::Output::Stderr, - ) - .unwrap(); - - let thread_local_stats = Arc::new(Mutex::new(Vec::new())); - - let rt = tokio::runtime::Builder::new_multi_thread() - .on_thread_start({ - let thread_local_stats = Arc::clone(&thread_local_stats); - move || { - // pre-initialize the histograms - STATS.with(|stats| { - let stats: Arc<_> = Arc::clone(&*stats.borrow()); - thread_local_stats.lock().unwrap().push(stats); - }); - } - }) - .enable_all() - .build() - .unwrap(); - - let main_task = rt.spawn(main_impl(args, thread_local_stats)); - rt.block_on(main_task).unwrap() -} - struct Target { timeline: TenantTimelineId, lsn_range: Option>, } +#[derive(serde::Serialize)] +struct Output { + total: request_stats::Output, +} + +tokio_thread_local_stats::declare!(STATS: request_stats::Stats); + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + tokio_thread_local_stats::main!(STATS, move |thread_local_stats| { + main_impl(args, thread_local_stats) + }) +} + async fn main_impl( args: Args, - thread_local_stats: Arc>>>>, + all_thread_local_stats: AllThreadLocalStats, ) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); @@ -196,11 +89,12 @@ async fn main_impl( }, ) .await?; - let mut js = JoinSet::new(); for timeline in &timelines { js.spawn({ let timeline = *timeline; + // FIXME: this triggers initial logical size calculation + // https://github.com/neondatabase/neon/issues/6168 let info = mgmt_api_client .timeline_info(timeline.tenant_id, timeline.timeline_id) .await @@ -208,7 +102,7 @@ async fn main_impl( async move { anyhow::Ok(Target { timeline, - // TODO: lsn_range != latest LSN + // TODO: support lsn_range != latest LSN lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)), }) } @@ -302,8 +196,8 @@ async fn main_impl( let output = Output { total: { - let mut agg_stats = ThreadLocalStats::new(); - for stats in thread_local_stats.lock().unwrap().iter() { + let mut agg_stats = request_stats::Stats::new(); + for stats in all_thread_local_stats.lock().unwrap().iter() { let stats = stats.lock().unwrap(); agg_stats.add(&stats); } diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index fac3791db5..6043856cbb 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -11,9 +11,7 @@ use rand::prelude::*; use tokio::sync::Barrier; use tokio::task::JoinSet; use tracing::{info, instrument}; -use utils::logging; -use std::cell::RefCell; use std::collections::HashMap; use std::future::Future; use std::num::NonZeroUsize; @@ -25,6 +23,8 @@ use std::time::{Duration, Instant}; use crate::cli; use crate::util::tenant_timeline_id::TenantTimelineId; +use crate::util::tokio_thread_local_stats::AllThreadLocalStats; +use crate::util::{request_stats, tokio_thread_local_stats}; /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] @@ -57,126 +57,6 @@ impl LiveStats { } } -#[derive(serde::Serialize)] -struct Output { - total: PerTaskOutput, -} - -const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99]; - -struct LatencyPercentiles { - latency_percentiles: [Duration; 4], -} - -impl serde::Serialize for LatencyPercentiles { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - use serde::ser::SerializeMap; - let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?; - for p in LATENCY_PERCENTILES { - ser.serialize_entry( - &format!("p{p}"), - &format!( - "{}", - &humantime::format_duration(self.latency_percentiles[0]) - ), - )?; - } - ser.end() - } -} - -#[derive(serde::Serialize)] -struct PerTaskOutput { - request_count: u64, - #[serde(with = "humantime_serde")] - latency_mean: Duration, - latency_percentiles: LatencyPercentiles, -} - -struct ThreadLocalStats { - latency_histo: hdrhistogram::Histogram, -} - -impl ThreadLocalStats { - fn new() -> Self { - Self { - // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, - // which would skew the benchmark results. - latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(), - } - } - fn observe(&mut self, latency: Duration) -> anyhow::Result<()> { - let micros: u64 = latency - .as_micros() - .try_into() - .context("latency greater than u64")?; - self.latency_histo - .record(micros) - .context("add to histogram")?; - Ok(()) - } - fn output(&self) -> PerTaskOutput { - let latency_percentiles = std::array::from_fn(|idx| { - let micros = self - .latency_histo - .value_at_percentile(LATENCY_PERCENTILES[idx]); - Duration::from_micros(micros) - }); - PerTaskOutput { - request_count: self.latency_histo.len(), - latency_mean: Duration::from_micros(self.latency_histo.mean() as u64), - latency_percentiles: LatencyPercentiles { - latency_percentiles, - }, - } - } - - fn add(&mut self, other: &Self) { - let Self { - ref mut latency_histo, - } = self; - latency_histo.add(&other.latency_histo).unwrap(); - } -} - -thread_local! { - pub static STATS: RefCell>> = std::cell::RefCell::new( - Arc::new(Mutex::new(ThreadLocalStats::new())) - ); -} - -pub(crate) fn main(args: Args) -> anyhow::Result<()> { - logging::init( - logging::LogFormat::Plain, - logging::TracingErrorLayerEnablement::Disabled, - logging::Output::Stderr, - ) - .unwrap(); - - let thread_local_stats = Arc::new(Mutex::new(Vec::new())); - - let rt = tokio::runtime::Builder::new_multi_thread() - .on_thread_start({ - let thread_local_stats = Arc::clone(&thread_local_stats); - move || { - // pre-initialize the histograms - STATS.with(|stats| { - let stats: Arc<_> = Arc::clone(&*stats.borrow()); - thread_local_stats.lock().unwrap().push(stats); - }); - } - }) - .enable_all() - .build() - .unwrap(); - - let main_task = rt.spawn(main_impl(args, thread_local_stats)); - rt.block_on(main_task).unwrap() -} - #[derive(Clone)] struct KeyRange { timeline: TenantTimelineId, @@ -191,9 +71,22 @@ impl KeyRange { } } +#[derive(serde::Serialize)] +struct Output { + total: request_stats::Output, +} + +tokio_thread_local_stats::declare!(STATS: request_stats::Stats); + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + tokio_thread_local_stats::main!(STATS, move |thread_local_stats| { + main_impl(args, thread_local_stats) + }) +} + async fn main_impl( args: Args, - thread_local_stats: Arc>>>>, + all_thread_local_stats: AllThreadLocalStats, ) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); @@ -392,8 +285,8 @@ async fn main_impl( let output = Output { total: { - let mut agg_stats = ThreadLocalStats::new(); - for stats in thread_local_stats.lock().unwrap().iter() { + let mut agg_stats = request_stats::Stats::new(); + for stats in all_thread_local_stats.lock().unwrap().iter() { let stats = stats.lock().unwrap(); agg_stats.add(&stats); } diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 8e3a446d9d..fdd5d4051c 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -1,4 +1,5 @@ use clap::Parser; +use utils::logging; pub(crate) mod cli; pub(crate) mod util; @@ -16,6 +17,13 @@ enum Args { } fn main() { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + let args = Args::parse(); match args { Args::Basebackup(args) => basebackup::main(args), diff --git a/pageserver/pagebench/src/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/trigger_initial_size_calculation.rs index 2e2ecfa236..4e6eb0bc92 100644 --- a/pageserver/pagebench/src/trigger_initial_size_calculation.rs +++ b/pageserver/pagebench/src/trigger_initial_size_calculation.rs @@ -3,8 +3,6 @@ use std::sync::Arc; use humantime::Duration; use tokio::task::JoinSet; -use utils::logging; - use crate::{cli, util::tenant_timeline_id::TenantTimelineId}; #[derive(clap::Parser)] @@ -26,13 +24,6 @@ pub(crate) struct Args { } pub(crate) fn main(args: Args) -> anyhow::Result<()> { - logging::init( - logging::LogFormat::Plain, - logging::TracingErrorLayerEnablement::Disabled, - logging::Output::Stderr, - ) - .unwrap(); - let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() diff --git a/pageserver/pagebench/src/util.rs b/pageserver/pagebench/src/util.rs index d3ade58d33..06895d04f2 100644 --- a/pageserver/pagebench/src/util.rs +++ b/pageserver/pagebench/src/util.rs @@ -1,3 +1,6 @@ pub(crate) mod connstring; pub(crate) mod discover_timelines; +pub(crate) mod request_stats; pub(crate) mod tenant_timeline_id; +#[macro_use] +pub(crate) mod tokio_thread_local_stats; diff --git a/pageserver/pagebench/src/util/request_stats.rs b/pageserver/pagebench/src/util/request_stats.rs new file mode 100644 index 0000000000..5ecf1cbf24 --- /dev/null +++ b/pageserver/pagebench/src/util/request_stats.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use anyhow::Context; + +pub(crate) struct Stats { + latency_histo: hdrhistogram::Histogram, +} + +impl Stats { + pub(crate) fn new() -> Self { + Self { + // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, + // which would skew the benchmark results. + latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(), + } + } + pub(crate) fn observe(&mut self, latency: Duration) -> anyhow::Result<()> { + let micros: u64 = latency + .as_micros() + .try_into() + .context("latency greater than u64")?; + self.latency_histo + .record(micros) + .context("add to histogram")?; + Ok(()) + } + pub(crate) fn output(&self) -> Output { + let latency_percentiles = std::array::from_fn(|idx| { + let micros = self + .latency_histo + .value_at_percentile(LATENCY_PERCENTILES[idx]); + Duration::from_micros(micros) + }); + Output { + request_count: self.latency_histo.len(), + latency_mean: Duration::from_micros(self.latency_histo.mean() as u64), + latency_percentiles: LatencyPercentiles { + latency_percentiles, + }, + } + } + pub(crate) fn add(&mut self, other: &Self) { + let Self { + ref mut latency_histo, + } = self; + latency_histo.add(&other.latency_histo).unwrap(); + } +} + +impl Default for Stats { + fn default() -> Self { + Self::new() + } +} + +const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99]; + +struct LatencyPercentiles { + latency_percentiles: [Duration; 4], +} + +impl serde::Serialize for LatencyPercentiles { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeMap; + let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?; + for p in LATENCY_PERCENTILES { + ser.serialize_entry( + &format!("p{p}"), + &format!( + "{}", + &humantime::format_duration(self.latency_percentiles[0]) + ), + )?; + } + ser.end() + } +} + +#[derive(serde::Serialize)] +pub(crate) struct Output { + request_count: u64, + #[serde(with = "humantime_serde")] + latency_mean: Duration, + latency_percentiles: LatencyPercentiles, +} diff --git a/pageserver/pagebench/src/util/tokio_thread_local_stats.rs b/pageserver/pagebench/src/util/tokio_thread_local_stats.rs new file mode 100644 index 0000000000..82526213b6 --- /dev/null +++ b/pageserver/pagebench/src/util/tokio_thread_local_stats.rs @@ -0,0 +1,45 @@ +pub(crate) type ThreadLocalStats = Arc>; +pub(crate) type AllThreadLocalStats = Arc>>>; + +macro_rules! declare { + ($THREAD_LOCAL_NAME:ident: $T:ty) => { + thread_local! { + pub static $THREAD_LOCAL_NAME: std::cell::RefCell> = std::cell::RefCell::new( + std::sync::Arc::new(std::sync::Mutex::new(Default::default())) + ); + } + }; +} + +use std::sync::{Arc, Mutex}; + +pub(crate) use declare; + +macro_rules! main { + ($THREAD_LOCAL_NAME:ident, $main_impl:expr) => {{ + let main_impl = $main_impl; + let all = Arc::new(Mutex::new(Vec::new())); + + let rt = tokio::runtime::Builder::new_multi_thread() + .on_thread_start({ + let all = Arc::clone(&all); + move || { + // pre-initialize the thread local stats by accessesing them + // (some stats like requests_stats::Stats are quite costly to initialize, + // we don't want to pay that cost during the measurement period) + $THREAD_LOCAL_NAME.with(|stats| { + let stats: Arc<_> = Arc::clone(&*stats.borrow()); + all.lock().unwrap().push(stats); + }); + } + }) + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(all)); + rt.block_on(main_task).unwrap() + }}; +} + +pub(crate) use main;