per-second RPS

This commit is contained in:
Christian Schwarz
2023-11-02 17:11:37 +00:00
committed by Christian Schwarz
parent 4ea2834711
commit 9411a372f7

View File

@@ -16,6 +16,7 @@ use scopeguard::defer;
use std::env::args;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::sync::mpsc::{channel, Sender};
@@ -64,6 +65,17 @@ struct Args {
pick_n_tenants: Option<usize>,
}
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
@@ -123,9 +135,34 @@ async fn main() {
}
println!("tenant_timelines:\n{:?}", tenant_timelines);
let mut stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let t = tokio::spawn(timeline(args, client.clone(), tenant_id, timeline_id));
let stats = Arc::clone(&stats);
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
stats,
));
tasks.push(t);
}
@@ -136,12 +173,13 @@ async fn main() {
fn timeline(
args: &'static Args,
client: Client<HttpConnector, hyper::Body>,
http_client: Client<HttpConnector, hyper::Body>,
tenant_id: String,
timeline_id: String,
stats: Arc<Stats>,
) -> impl Future<Output = ()> + Send + Sync {
async move {
let mut resp = client
let mut resp = http_client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
@@ -191,32 +229,42 @@ fn timeline(
for i in 0..args.num_tasks {
let ranges = ranges.clone();
let weights = weights.clone();
let client = client.clone();
let client = http_client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let task = tokio::spawn(async move {
let mut client = getpage_client::Client::new(tenant_id.clone(), timeline_id.clone())
.await
.unwrap();
for i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
let key = repository::Key::from_i128(key);
// XXX filter these out when we iterate the keyspace
assert!(
is_rel_block_key(key),
"we filter non-relblock keys out above"
);
let (rel_tag, block_no) = key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client.getpage(key, lsn).await.with_context(|| {
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
}).unwrap();
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut client =
getpage_client::Client::new(tenant_id.clone(), timeline_id.clone())
.await
.unwrap();
for i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
let key = repository::Key::from_i128(key);
// XXX filter these out when we iterate the keyspace
assert!(
is_rel_block_key(key),
"we filter non-relblock keys out above"
);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client
.getpage(key, lsn)
.await
.with_context(|| {
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
})
.unwrap();
stats.inc();
}
client.shutdown().await;
}
client.shutdown().await;
});
tasks.push(task);
}
@@ -224,12 +272,6 @@ fn timeline(
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
(args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64()
);
}
}