have one HdrHistogram per thread instead of one per task

This commit is contained in:
Christian Schwarz
2023-11-24 14:27:52 +00:00
parent f1a714e465
commit bd06672cdd
2 changed files with 52 additions and 31 deletions

View File

@@ -9,8 +9,9 @@ use tracing::info;
use utils::id::{TenantId, TimelineId};
use utils::logging;
use std::cell::RefCell;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
/// Measure performance of the GetPage API, targeting the latest LSN.
@@ -42,7 +43,6 @@ impl LiveStats {
#[derive(serde::Serialize)]
struct Output {
per_task: Vec<PerTaskOutput>,
total: PerTaskOutput,
}
@@ -80,11 +80,11 @@ struct PerTaskOutput {
latency_percentiles: LatencyPercentiles,
}
struct PerTaskStats {
struct ThreadLocalStats {
latency_histo: hdrhistogram::Histogram<u64>,
}
impl PerTaskStats {
impl ThreadLocalStats {
fn new() -> Self {
Self {
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
@@ -126,7 +126,13 @@ impl PerTaskStats {
}
}
pub(crate) async fn main(args: Args) -> anyhow::Result<()> {
thread_local! {
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
Arc::new(Mutex::new(ThreadLocalStats::new()))
);
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
@@ -134,6 +140,31 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> {
)
.unwrap();
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_start({
let thread_local_stats = Arc::clone(&thread_local_stats);
move || {
// pre-initialize the histograms
STATS.with(|stats| {
let stats: Arc<_> = Arc::clone(&*stats.borrow());
thread_local_stats.lock().unwrap().push(stats);
});
}
})
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(args, thread_local_stats));
rt.block_on(main_task).unwrap()
}
async fn main_impl(
args: Args,
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let client = Arc::new(pageserver::client::mgmt_api::Client::new(
@@ -212,26 +243,19 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> {
tasks.push(((tenant_id, timeline_id), t));
}
let mut per_timeline: Vec<((TenantId, TimelineId), Vec<PerTaskStats>)> =
Vec::with_capacity(tasks.len());
for (key, task) in tasks {
per_timeline.push((key, task.await.unwrap().unwrap()));
for (_, t) in tasks {
t.await.unwrap().unwrap();
}
let per_task: Vec<(_, _)> = per_timeline
.iter()
.flat_map(|(k, task_stats)| task_stats.into_iter().map(|s| (*k, s)))
.collect();
let output = Output {
total: {
let mut agg_stats = PerTaskStats::new();
for (_, stats) in &per_task {
agg_stats.add(stats);
let mut agg_stats = ThreadLocalStats::new();
for stats in thread_local_stats.lock().unwrap().iter() {
let stats = stats.lock().unwrap();
agg_stats.add(&*stats);
}
agg_stats.output()
},
per_task: per_task.iter().map(|(_, s)| s.output()).collect(),
};
let output = serde_json::to_string_pretty(&output).unwrap();
@@ -247,8 +271,8 @@ async fn timeline(
timeline_id: TimelineId,
start_work_barrier: Arc<Barrier>,
all_work_done_barrier: Arc<Barrier>,
stats: Arc<LiveStats>,
) -> anyhow::Result<Vec<PerTaskStats>> {
live_stats: Arc<LiveStats>,
) -> anyhow::Result<()> {
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
let lsn = partitioning.at_lsn;
@@ -299,7 +323,7 @@ async fn timeline(
let all_work_done_barrier = Arc::clone(&all_work_done_barrier);
let jh = tokio::spawn({
let live_stats = Arc::clone(&stats);
let live_stats = Arc::clone(&live_stats);
async move {
let mut getpage_client = pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
@@ -309,8 +333,6 @@ async fn timeline(
.await
.unwrap();
let mut stats = PerTaskStats::new();
start_work_barrier.wait().await;
for _i in 0..args.num_requests {
let key = {
@@ -332,21 +354,21 @@ async fn timeline(
.unwrap();
let elapsed = start.elapsed();
live_stats.inc();
stats.observe(elapsed).unwrap();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
getpage_client.shutdown().await;
stats
}
});
tasks.push(jh);
}
let mut ret = Vec::with_capacity(tasks.len());
for task in tasks {
ret.push(task.await.unwrap());
task.await.unwrap();
}
Ok(ret)
Ok(())
}

View File

@@ -8,11 +8,10 @@ enum Args {
GetPageLatestLsn(getpage_latest_lsn::Args),
}
#[tokio::main]
async fn main() {
fn main() {
let args = Args::parse();
match args {
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args).await,
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
}
.unwrap()
}