diff --git a/pageserver/src/bin/getpage_bench.rs b/pageserver/src/bin/getpage_bench.rs new file mode 100644 index 0000000000..cdb1ebcd8b --- /dev/null +++ b/pageserver/src/bin/getpage_bench.rs @@ -0,0 +1,208 @@ +use clap::Parser; +use std::future::Future; +use hyper::client::conn::Parts; +use hyper::client::HttpConnector; +use hyper::{Body, Client, Uri}; +use pageserver::{repository, tenant}; +use rand::prelude::*; +use std::env::args; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::thread; +use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::Mutex as AsyncMutex; +use tokio::task::JoinHandle; + +struct Key(repository::Key); + +impl std::str::FromStr for Key { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + repository::Key::from_hex(s).map(Key) + } +} + +struct KeyRange { + start: Key, + end: Key, +} + +impl KeyRange { + fn len(&self) -> i128 { + self.end.0.to_i128() - self.start.0.to_i128() + } +} + +#[derive(clap::Parser)] +struct Args { + #[clap(long, default_value = "http://localhost:9898")] + ps_endpoint: String, + // tenant_id: String, + // timeline_id: String, + num_tasks: usize, + num_requests: usize, + tenants: Option>, + #[clap(long)] + pick_n_tenants: Option, +} + +#[tokio::main] +async fn main() { + let args: &'static Args = Box::leak(Box::new(Args::parse())); + + let client = Client::new(); + + let tenants = if let Some(tenants) = &args.tenants { + tenants.clone() + } else { + // let tenant_id = "b97965931096047b2d54958756baee7b"; + // let timeline_id = "2868f84a8d166779e4c651b116c45059"; + + let resp = client + .get(Uri::try_from(&format!("{}/v1/tenant", args.ps_endpoint)).unwrap()) + .await + .unwrap(); + + let body = hyper::body::to_bytes(resp).await.unwrap(); + let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let mut out = Vec::new(); + for t in tenants.as_array().unwrap() { + if let Some(limit) = args.pick_n_tenants { + if out.len() >= limit { + break; + } + } + out.push(t.get("id").unwrap().as_str().unwrap().to_owned()); + } + if let Some(limit) = args.pick_n_tenants { + assert_eq!(out.len(), limit); + } + out + }; + + let mut tenant_timelines = Vec::new(); + for tenant_id in tenants { + let resp = client + .get( + Uri::try_from(&format!("{}/v1/tenant/{}/timeline", args.ps_endpoint, tenant_id)) + .unwrap(), + ) + .await + .unwrap(); + + let body = hyper::body::to_bytes(resp).await.unwrap(); + let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap(); + for t in timelines.as_array().unwrap() { + let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned(); + tenant_timelines.push((tenant_id.clone(), timeline_id)); + } + } + println!("tenant_timelines:\n{:?}", tenant_timelines); + + let mut tasks = Vec::new(); + for (tenant_id, timeline_id) in tenant_timelines { + let t = tokio::spawn(timeline( + args, + client.clone(), + tenant_id, + timeline_id, + )); + tasks.push(t); + } + + for t in tasks { + t.await.unwrap(); + } +} + +fn timeline( + args: &'static Args, + client: Client, + tenant_id: String, + timeline_id: String, +) -> impl Future { + async move { + let mut resp = client + .get( + Uri::try_from(&format!( + "{}/v1/tenant/{}/timeline/{}/keyspace", + args.ps_endpoint, tenant_id, timeline_id + )) + .unwrap(), + ) + .await + .unwrap(); + if !resp.status().is_success() { + panic!("Failed to get keyspace: {resp:?}"); + } + let body = hyper::body::to_bytes(resp).await.unwrap(); + let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + let lsn = Arc::new(keyspace["at_lsn"].as_str().unwrap().to_owned()); + + let ranges = keyspace["keys"] + .as_array() + .unwrap() + .iter() + .map(|r| { + let r = r.as_array().unwrap(); + assert_eq!(r.len(), 2); + let start = Key::from_str(r[0].as_str().unwrap()).unwrap(); + let end = Key::from_str(r[1].as_str().unwrap()).unwrap(); + KeyRange { start, end } + }) + .collect::>(); + + // weighted ranges + let weights = ranges.iter().map(|r| r.len()).collect::>(); + + let ranges = Arc::new(ranges); + let weights = Arc::new(weights); + + let (tx, mut rx) = channel::(1000); + let tx = Arc::new(AsyncMutex::new(tx)); + + let mut tasks = Vec::>::new(); + + let start = std::time::Instant::now(); + + for i in 0..args.num_tasks { + let ranges = ranges.clone(); + let weights = weights.clone(); + let lsn = lsn.clone(); + let client = client.clone(); + let tenant_id = tenant_id.clone(); + let timeline_id = timeline_id.clone(); + let task = tokio::spawn(async move { + for i in 0..args.num_requests { + let key = { + let mut rng = rand::thread_rng(); + let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); + let key = rng.gen_range((r.start.0.to_i128()..r.end.0.to_i128())); + key + }; + let url = format!( + "{}/v1/tenant/{}/timeline/{}/getpage?key={:036x}&lsn={}", + args.ps_endpoint, tenant_id, timeline_id, key, lsn + ); + let uri = url.parse::().unwrap(); + let resp = client.get(uri).await.unwrap(); + } + }); + tasks.push(task); + } + + drop(tx); + + for task in tasks { + task.await.unwrap(); + } + + let elapsed = start.elapsed(); + println!( + "RPS: {:.0}", + (args.num_requests * args.num_tasks) as f64 / elapsed.as_secs_f64() + ); + } +} diff --git a/test_runner/duplicate_tenant.py b/test_runner/duplicate_tenant.py new file mode 100644 index 0000000000..c0672718ad --- /dev/null +++ b/test_runner/duplicate_tenant.py @@ -0,0 +1,43 @@ +# Usage from top of repo: +# poetry run python3 test_runner/duplicate_tenant.py b97965931096047b2d54958756baee7b 10 +from queue import Queue +import sys +import threading + +import requests +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.types import TenantId + +initial_tenant = sys.argv[1] +ncopies = int(sys.argv[2]) +numthreads = int(sys.argv[3]) + + +# class DuckTypedNeonEnv: +# pass + + +# cli = NeonCli(DuckTypedNeonEnv()) + +q = Queue() +for i in range(0, ncopies): + q.put(i) + +for i in range(0, numthreads): + q.put(None) + + +def create(): + while True: + if q.get() == None: + break + new_tenant = TenantId.generate() + res = requests.post( + f"http://localhost:9898/v1/tenant/{initial_tenant}/duplicate", + json={"new_tenant_id": str(new_tenant)}, + ) + res.raise_for_status() + + +for i in range(0, numthreads): + threading.Thread(target=create).start()