From bd06672cdd44fc4fda15c77a58d726921bd6e07e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 24 Nov 2023 14:27:52 +0000 Subject: [PATCH] have one HdrHistogram per thread instead of one per task --- .../pagebench/src/getpage_latest_lsn.rs | 78 ++++++++++++------- pageserver/pagebench/src/main.rs | 5 +- 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index a8fa70090d..9b24287d71 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -9,8 +9,9 @@ use tracing::info; use utils::id::{TenantId, TimelineId}; use utils::logging; +use std::cell::RefCell; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; /// Measure performance of the GetPage API, targeting the latest LSN. @@ -42,7 +43,6 @@ impl LiveStats { #[derive(serde::Serialize)] struct Output { - per_task: Vec, total: PerTaskOutput, } @@ -80,11 +80,11 @@ struct PerTaskOutput { latency_percentiles: LatencyPercentiles, } -struct PerTaskStats { +struct ThreadLocalStats { latency_histo: hdrhistogram::Histogram, } -impl PerTaskStats { +impl ThreadLocalStats { fn new() -> Self { Self { // Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram, @@ -126,7 +126,13 @@ impl PerTaskStats { } } -pub(crate) async fn main(args: Args) -> anyhow::Result<()> { +thread_local! { + pub static STATS: RefCell>> = std::cell::RefCell::new( + Arc::new(Mutex::new(ThreadLocalStats::new())) + ); +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { logging::init( logging::LogFormat::Plain, logging::TracingErrorLayerEnablement::Disabled, @@ -134,6 +140,31 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> { ) .unwrap(); + let thread_local_stats = Arc::new(Mutex::new(Vec::new())); + + let rt = tokio::runtime::Builder::new_multi_thread() + .on_thread_start({ + let thread_local_stats = Arc::clone(&thread_local_stats); + move || { + // pre-initialize the histograms + STATS.with(|stats| { + let stats: Arc<_> = Arc::clone(&*stats.borrow()); + thread_local_stats.lock().unwrap().push(stats); + }); + } + }) + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(args, thread_local_stats)); + rt.block_on(main_task).unwrap() +} + +async fn main_impl( + args: Args, + thread_local_stats: Arc>>>>, +) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); let client = Arc::new(pageserver::client::mgmt_api::Client::new( @@ -212,26 +243,19 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> { tasks.push(((tenant_id, timeline_id), t)); } - let mut per_timeline: Vec<((TenantId, TimelineId), Vec)> = - Vec::with_capacity(tasks.len()); - for (key, task) in tasks { - per_timeline.push((key, task.await.unwrap().unwrap())); + for (_, t) in tasks { + t.await.unwrap().unwrap(); } - let per_task: Vec<(_, _)> = per_timeline - .iter() - .flat_map(|(k, task_stats)| task_stats.into_iter().map(|s| (*k, s))) - .collect(); - let output = Output { total: { - let mut agg_stats = PerTaskStats::new(); - for (_, stats) in &per_task { - agg_stats.add(stats); + let mut agg_stats = ThreadLocalStats::new(); + for stats in thread_local_stats.lock().unwrap().iter() { + let stats = stats.lock().unwrap(); + agg_stats.add(&*stats); } agg_stats.output() }, - per_task: per_task.iter().map(|(_, s)| s.output()).collect(), }; let output = serde_json::to_string_pretty(&output).unwrap(); @@ -247,8 +271,8 @@ async fn timeline( timeline_id: TimelineId, start_work_barrier: Arc, all_work_done_barrier: Arc, - stats: Arc, -) -> anyhow::Result> { + live_stats: Arc, +) -> anyhow::Result<()> { let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?; let lsn = partitioning.at_lsn; @@ -299,7 +323,7 @@ async fn timeline( let all_work_done_barrier = Arc::clone(&all_work_done_barrier); let jh = tokio::spawn({ - let live_stats = Arc::clone(&stats); + let live_stats = Arc::clone(&live_stats); async move { let mut getpage_client = pageserver::client::page_service::Client::new( args.page_service_connstring.clone(), @@ -309,8 +333,6 @@ async fn timeline( .await .unwrap(); - let mut stats = PerTaskStats::new(); - start_work_barrier.wait().await; for _i in 0..args.num_requests { let key = { @@ -332,21 +354,21 @@ async fn timeline( .unwrap(); let elapsed = start.elapsed(); live_stats.inc(); - stats.observe(elapsed).unwrap(); + STATS.with(|stats| { + stats.borrow().lock().unwrap().observe(elapsed).unwrap(); + }); } all_work_done_barrier.wait().await; getpage_client.shutdown().await; - stats } }); tasks.push(jh); } - let mut ret = Vec::with_capacity(tasks.len()); for task in tasks { - ret.push(task.await.unwrap()); + task.await.unwrap(); } - Ok(ret) + Ok(()) } diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 3088845723..cd7158bb0e 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -8,11 +8,10 @@ enum Args { GetPageLatestLsn(getpage_latest_lsn::Args), } -#[tokio::main] -async fn main() { +fn main() { let args = Args::parse(); match args { - Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args).await, + Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args), } .unwrap() }