From 78a28f787cb289b2b38543b0fc81ed67d5a17d10 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 2 Nov 2023 17:11:37 +0000 Subject: [PATCH] per-second RPS --- pageserver/src/bin/getpage_bench_libpq.rs | 106 +++++++++++++++------- 1 file changed, 74 insertions(+), 32 deletions(-) diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs index 8a1718a73b..1d6c224c31 100644 --- a/pageserver/src/bin/getpage_bench_libpq.rs +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -16,6 +16,7 @@ use scopeguard::defer; use std::env::args; use std::future::Future; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use tokio::sync::mpsc::{channel, Sender}; @@ -64,6 +65,17 @@ struct Args { pick_n_tenants: Option, } +#[derive(Debug, Default)] +struct Stats { + completed_requests: AtomicU64, +} + +impl Stats { + fn inc(&self) { + self.completed_requests.fetch_add(1, Ordering::Relaxed); + } +} + #[tokio::main] async fn main() { let args: &'static Args = Box::leak(Box::new(Args::parse())); @@ -123,9 +135,34 @@ async fn main() { } println!("tenant_timelines:\n{:?}", tenant_timelines); + let mut stats = Arc::new(Stats::default()); + + tokio::spawn({ + let stats = Arc::clone(&stats); + async move { + loop { + let start = std::time::Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let elapsed = start.elapsed(); + println!( + "RPS: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64() + ); + } + } + }); + let mut tasks = Vec::new(); for (tenant_id, timeline_id) in tenant_timelines { - let t = tokio::spawn(timeline(args, client.clone(), tenant_id, timeline_id)); + let stats = Arc::clone(&stats); + let t = tokio::spawn(timeline( + args, + client.clone(), + tenant_id, + timeline_id, + stats, + )); tasks.push(t); } @@ -136,12 +173,13 @@ async fn main() { fn timeline( args: &'static Args, - client: Client, + http_client: Client, tenant_id: String, timeline_id: String, + stats: Arc, ) -> impl Future + Send + Sync { async move { - let mut resp = client + let mut resp = http_client .get( Uri::try_from(&format!( "{}/v1/tenant/{}/timeline/{}/keyspace", @@ -191,32 +229,42 @@ fn timeline( for i in 0..args.num_tasks { let ranges = ranges.clone(); let weights = weights.clone(); - let client = client.clone(); + let client = http_client.clone(); let tenant_id = tenant_id.clone(); let timeline_id = timeline_id.clone(); - let task = tokio::spawn(async move { - let mut client = getpage_client::Client::new(tenant_id.clone(), timeline_id.clone()) - .await - .unwrap(); - for i in 0..args.num_requests { - let key = { - let mut rng = rand::thread_rng(); - let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); - let key: i128 = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128())); - let key = repository::Key::from_i128(key); - // XXX filter these out when we iterate the keyspace - assert!( - is_rel_block_key(key), - "we filter non-relblock keys out above" - ); - let (rel_tag, block_no) = key_to_rel_block(key).expect("we just checked"); - RelTagBlockNo { rel_tag, block_no } - }; - client.getpage(key, lsn).await.with_context(|| { - format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) - }).unwrap(); + let task = tokio::spawn({ + let stats = Arc::clone(&stats); + async move { + let mut client = + getpage_client::Client::new(tenant_id.clone(), timeline_id.clone()) + .await + .unwrap(); + for i in 0..args.num_requests { + let key = { + let mut rng = rand::thread_rng(); + let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); + let key: i128 = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128())); + let key = repository::Key::from_i128(key); + // XXX filter these out when we iterate the keyspace + assert!( + is_rel_block_key(key), + "we filter non-relblock keys out above" + ); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we just checked"); + RelTagBlockNo { rel_tag, block_no } + }; + client + .getpage(key, lsn) + .await + .with_context(|| { + format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) + }) + .unwrap(); + stats.inc(); + } + client.shutdown().await; } - client.shutdown().await; }); tasks.push(task); } @@ -224,12 +272,6 @@ fn timeline( for task in tasks { task.await.unwrap(); } - - let elapsed = start.elapsed(); - println!( - "RPS: {:.0}", - (args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64() - ); } }