mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
per-second RPS
This commit is contained in:
committed by
Christian Schwarz
parent
001a0e4006
commit
78a28f787c
@@ -16,6 +16,7 @@ use scopeguard::defer;
|
||||
use std::env::args;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
@@ -64,6 +65,17 @@ struct Args {
|
||||
pick_n_tenants: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Stats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl Stats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args: &'static Args = Box::leak(Box::new(Args::parse()));
|
||||
@@ -123,9 +135,34 @@ async fn main() {
|
||||
}
|
||||
println!("tenant_timelines:\n{:?}", tenant_timelines);
|
||||
|
||||
let mut stats = Arc::new(Stats::default());
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&stats);
|
||||
async move {
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
println!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (tenant_id, timeline_id) in tenant_timelines {
|
||||
let t = tokio::spawn(timeline(args, client.clone(), tenant_id, timeline_id));
|
||||
let stats = Arc::clone(&stats);
|
||||
let t = tokio::spawn(timeline(
|
||||
args,
|
||||
client.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
stats,
|
||||
));
|
||||
tasks.push(t);
|
||||
}
|
||||
|
||||
@@ -136,12 +173,13 @@ async fn main() {
|
||||
|
||||
fn timeline(
|
||||
args: &'static Args,
|
||||
client: Client<HttpConnector, hyper::Body>,
|
||||
http_client: Client<HttpConnector, hyper::Body>,
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
stats: Arc<Stats>,
|
||||
) -> impl Future<Output = ()> + Send + Sync {
|
||||
async move {
|
||||
let mut resp = client
|
||||
let mut resp = http_client
|
||||
.get(
|
||||
Uri::try_from(&format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/keyspace",
|
||||
@@ -191,32 +229,42 @@ fn timeline(
|
||||
for i in 0..args.num_tasks {
|
||||
let ranges = ranges.clone();
|
||||
let weights = weights.clone();
|
||||
let client = client.clone();
|
||||
let client = http_client.clone();
|
||||
let tenant_id = tenant_id.clone();
|
||||
let timeline_id = timeline_id.clone();
|
||||
let task = tokio::spawn(async move {
|
||||
let mut client = getpage_client::Client::new(tenant_id.clone(), timeline_id.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
for i in 0..args.num_requests {
|
||||
let key = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
let key: i128 = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
|
||||
let key = repository::Key::from_i128(key);
|
||||
// XXX filter these out when we iterate the keyspace
|
||||
assert!(
|
||||
is_rel_block_key(key),
|
||||
"we filter non-relblock keys out above"
|
||||
);
|
||||
let (rel_tag, block_no) = key_to_rel_block(key).expect("we just checked");
|
||||
RelTagBlockNo { rel_tag, block_no }
|
||||
};
|
||||
client.getpage(key, lsn).await.with_context(|| {
|
||||
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
|
||||
}).unwrap();
|
||||
let task = tokio::spawn({
|
||||
let stats = Arc::clone(&stats);
|
||||
async move {
|
||||
let mut client =
|
||||
getpage_client::Client::new(tenant_id.clone(), timeline_id.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
for i in 0..args.num_requests {
|
||||
let key = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
let key: i128 = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128()));
|
||||
let key = repository::Key::from_i128(key);
|
||||
// XXX filter these out when we iterate the keyspace
|
||||
assert!(
|
||||
is_rel_block_key(key),
|
||||
"we filter non-relblock keys out above"
|
||||
);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we just checked");
|
||||
RelTagBlockNo { rel_tag, block_no }
|
||||
};
|
||||
client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
|
||||
})
|
||||
.unwrap();
|
||||
stats.inc();
|
||||
}
|
||||
client.shutdown().await;
|
||||
}
|
||||
client.shutdown().await;
|
||||
});
|
||||
tasks.push(task);
|
||||
}
|
||||
@@ -224,12 +272,6 @@ fn timeline(
|
||||
for task in tasks {
|
||||
task.await.unwrap();
|
||||
}
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
println!(
|
||||
"RPS: {:.0}",
|
||||
(args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user