diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index 3746c2b972..5df290aea5 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -1,27 +1,21 @@ mod tenant_timeline_id; use anyhow::Context; -use pageserver::client::mgmt_api::Client; use pageserver::client::page_service::RelTagBlockNo; use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block}; -use pageserver::{repository, tenant}; -use std::sync::Weak; -use tokio_util::sync::CancellationToken; +use pageserver::repository; use utils::lsn::Lsn; -use pageserver::tenant::Tenant; use rand::prelude::*; use tokio::sync::Barrier; use tokio::task::JoinSet; use tracing::{info, instrument}; -use utils::id::{TenantId, TimelineId}; +use utils::id::TenantId; use utils::logging; use std::cell::RefCell; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::num::NonZeroUsize; -use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -37,7 +31,9 @@ pub(crate) struct Args { page_service_connstring: String, #[clap(long, default_value = "1")] num_clients: NonZeroUsize, - // targets: Option>, + #[clap(long)] + runtime: Option, + targets: Option>, } #[derive(Debug, Default)] @@ -171,50 +167,6 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { rt.block_on(main_task).unwrap() } -struct ClientPool { - cache: HashMap>>>, - lru: VecDeque>>, -} - -impl ClientPool { - pub fn new() -> Self { - Self { - cache: Default::default(), - lru: Default::default(), - } - } - - pub fn take( - &mut self, - timeline: TenantTimelineId, - ) -> Option>> { - match self.cache.entry(timeline) { - Entry::Occupied(mut o) => { - while let Some(weak) = o.get_mut().pop() { - if let Some(strong) = Weak::upgrade(&weak) { - return Some(strong); - } - } - None - } - Entry::Vacant(_) => None, - } - } - - pub fn put( - &mut self, - timeline: TenantTimelineId, - client: Arc>, - ) { - match self.cache.entry(timeline) { - Entry::Occupied(mut o) => o.get_mut().push(Arc::downgrade(&client)), - Entry::Vacant(v) => todo!(), - } - self.lru.push_front(client); - self.lru.truncate(1000); - } -} - struct KeyRange { timeline: TenantTimelineId, timeline_lsn: Lsn, @@ -228,11 +180,6 @@ impl KeyRange { } } -struct Targets { - ranges: Vec, - weights: Vec, -} - async fn main_impl( args: Args, thread_local_stats: Arc>>>>, @@ -244,9 +191,9 @@ async fn main_impl( )); // discover targets - let mut targets: Vec = Vec::new(); + let mut timelines: Vec = Vec::new(); if false { - targets = targets.clone(); + timelines = timelines.clone(); } else { let tenants: Vec = mgmt_api_client .list_tenants() @@ -267,9 +214,9 @@ async fn main_impl( }); } while let Some(res) = js.join_next().await { - let (tenant_id, timelines) = res.unwrap(); - for tl in timelines { - targets.push(TenantTimelineId { + let (tenant_id, tl_infos) = res.unwrap(); + for tl in tl_infos { + timelines.push(TenantTimelineId { tenant_id, timeline_id: tl.timeline_id, }); @@ -277,15 +224,16 @@ async fn main_impl( } } - info!("targets:\n{:?}", targets); + info!("timelines:\n{:?}", timelines); let mut js = JoinSet::new(); - for target in targets { + for timeline in &timelines { js.spawn({ let mgmt_api_client = Arc::clone(&mgmt_api_client); + let timeline = *timeline; async move { let partitioning = mgmt_api_client - .keyspace(target.tenant_id, target.timeline_id) + .keyspace(timeline.tenant_id, timeline.timeline_id) .await?; let lsn = partitioning.at_lsn; @@ -299,7 +247,7 @@ async fn main_impl( // filter out non-relblock keys match (is_rel_block_key(start), is_rel_block_key(end)) { (true, true) => Some(KeyRange { - timeline: target, + timeline, timeline_lsn: lsn, start: start.to_i128(), end: end.to_i128(), @@ -317,30 +265,26 @@ async fn main_impl( }); } let mut all_ranges: Vec = Vec::new(); - while let Some(ranges) = js.join_next().await { - all_ranges.extend(ranges.unwrap().unwrap()); + while let Some(res) = js.join_next().await { + all_ranges.extend(res.unwrap().unwrap()); } - let mut all_weights: Vec = all_ranges - .iter() - .map(|v| (v.end - v.start).try_into().unwrap()) - .collect(); - let targets = Arc::new(Targets { - ranges: all_ranges, - weights: all_weights, - }); + let weights = + rand::distributions::weighted::WeightedIndex::new(all_ranges.iter().map(|v| v.len())) + .unwrap(); - let stats = Arc::new(LiveStats::default()); + let live_stats = Arc::new(LiveStats::default()); - let num_work_tasks = args.num_clients.get(); + let num_client_tasks = timelines.len(); let num_live_stats_dump = 1; + let num_work_sender_tasks = 1; let start_work_barrier = Arc::new(tokio::sync::Barrier::new( - num_work_tasks + num_live_stats_dump, + num_client_tasks + num_live_stats_dump + num_work_sender_tasks, )); - let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks)); + let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks)); tokio::spawn({ - let stats = Arc::clone(&stats); + let stats = Arc::clone(&live_stats); let start_work_barrier = Arc::clone(&start_work_barrier); async move { start_work_barrier.wait().await; @@ -357,25 +301,49 @@ async fn main_impl( } }); - let pool = Arc::new(Mutex::new(ClientPool::new())); - - let cancel = CancellationToken::new(); - + let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - for client_id in 0..args.num_clients.get() { - let live_stats = Arc::clone(&stats); - let t = tokio::spawn(client( + for tl in &timelines { + let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are + work_senders.insert(tl, sender); + tasks.push(tokio::spawn(client( args, - client_id, - Arc::clone(&mgmt_api_client), - Arc::clone(&pool), + *tl, Arc::clone(&start_work_barrier), + receiver, Arc::clone(&all_work_done_barrier), - Arc::clone(&targets), - live_stats, - cancel.clone(), - )); - tasks.push(t); + Arc::clone(&live_stats), + ))); + } + + let work_sender = async move { + start_work_barrier.wait().await; + loop { + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = &all_ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; + let sender = work_senders.get(&range.timeline).unwrap(); + // TODO: what if this blocks? + sender.send((key, range.timeline_lsn)).await.ok().unwrap(); + } + }; + + if let Some(runtime) = args.runtime { + match tokio::time::timeout(runtime.into(), work_sender).await { + Ok(()) => unreachable!("work sender never terminates"), + Err(_timeout) => { + // this implicitly drops the work_senders, making all the clients exit + } + } + } else { + work_sender.await; + unreachable!("work sender never terminates"); } for t in tasks { @@ -399,131 +367,38 @@ async fn main_impl( anyhow::Ok(()) } -#[instrument(skip_all, %client_id)] +#[instrument(skip_all)] async fn client( args: &'static Args, - client_id: usize, - mgmt_api_client: Arc, - pool: Arc>, + timeline: TenantTimelineId, start_work_barrier: Arc, + mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>, all_work_done_barrier: Arc, - targets: Arc, live_stats: Arc, - cancel: CancellationToken, ) { start_work_barrier.wait().await; - while !cancel.is_cancelled() { - let (range, key) = { - let mut rng = rand::thread_rng(); - let r = targets - .ranges - .choose_weighted(&mut rng, |range| range.len()) - .unwrap(); - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - let (rel_tag, block_no) = - key_to_rel_block(key).expect("we filter non-rel-block keys out above"); - (r, RelTagBlockNo { rel_tag, block_no }) - }; - - let client = match pool.lock().unwrap().take(range.timeline) { - Some(client) => client, - None => Arc::new(Mutex::new( - pageserver::client::page_service::Client::new( - args.page_service_connstring.clone(), - range.timeline.tenant_id, - range.timeline.timeline_id, - ) - .await - .unwrap(), - )), - }; + let mut client = pageserver::client::page_service::Client::new( + args.page_service_connstring.clone(), + timeline.tenant_id, + timeline.timeline_id, + ) + .await + .unwrap(); + while let Some((key, lsn)) = work.recv().await { let start = Instant::now(); client - .lock() - .unwrap() - .getpage(key, range.timeline_lsn) + .getpage(key, lsn) .await - .with_context(|| format!("getpage for {}", range.timeline)) + .with_context(|| format!("getpage for {timeline}")) .unwrap(); let elapsed = start.elapsed(); live_stats.inc(); STATS.with(|stats| { stats.borrow().lock().unwrap().observe(elapsed).unwrap(); }); - - pool.lock().unwrap().put(range.timeline, client); } all_work_done_barrier.wait().await; } - -// async fn timeline( -// args: &'static Args, -// mgmt_api_client: Arc, -// tenant_id: TenantId, -// timeline_id: TimelineId, -// start_work_barrier: Arc, -// all_work_done_barrier: Arc, -// live_stats: Arc, -// ) -> anyhow::Result<()> { -// let mut tasks = Vec::new(); - -// for _i in 0..args.num_tasks { -// let ranges = ranges.clone(); -// let _weights = weights.clone(); -// let start_work_barrier = Arc::clone(&start_work_barrier); -// let all_work_done_barrier = Arc::clone(&all_work_done_barrier); - -// let jh = tokio::spawn({ -// let live_stats = Arc::clone(&live_stats); -// async move { -// let mut getpage_client = pageserver::client::page_service::Client::new( -// args.page_service_connstring.clone(), -// tenant_id, -// timeline_id, -// ) -// .await -// .unwrap(); - -// start_work_barrier.wait().await; -// for _i in 0..args.num_requests { -// let key = { -// let mut rng = rand::thread_rng(); -// let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); -// let key: i128 = rng.gen_range(r.start..r.end); -// let key = repository::Key::from_i128(key); -// let (rel_tag, block_no) = -// key_to_rel_block(key).expect("we filter non-rel-block keys out above"); -// RelTagBlockNo { rel_tag, block_no } -// }; -// let start = Instant::now(); -// getpage_client -// .getpage(key, lsn) -// .await -// .with_context(|| { -// format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) -// }) -// .unwrap(); -// let elapsed = start.elapsed(); -// live_stats.inc(); -// STATS.with(|stats| { -// stats.borrow().lock().unwrap().observe(elapsed).unwrap(); -// }); -// } -// all_work_done_barrier.wait().await; - -// getpage_client.shutdown().await; -// } -// }); -// tasks.push(jh); -// } - -// for task in tasks { -// task.await.unwrap(); -// } - -// Ok(()) -// } diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py index 43221255be..428134c8fe 100644 --- a/test_runner/performance/test_pageserver.py +++ b/test_runner/performance/test_pageserver.py @@ -42,7 +42,7 @@ def test_getpage_throughput( template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli) template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"]) with env.endpoints.create_start("main", tenant_id=template_tenant) as ep: - pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()]) + pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()]) last_flush_lsn_upload(env, ep, template_tenant, template_timeline) ps_http.tenant_detach(template_tenant) @@ -53,7 +53,7 @@ def test_getpage_throughput( src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines" assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory" tenants = [template_tenant] - for i in range(0, 200): + for i in range(0, 1): new_tenant = TenantId.generate() tenants.append(new_tenant) log.info("Duplicating tenant #%s: %s", i, new_tenant) @@ -107,11 +107,9 @@ def test_getpage_throughput( ps_http.base_url, "--page-service-connstring", env.pageserver.connstr(password=None), - "--num-tasks", - "1", - "--num-requests", - "200000", - *[str(tenant) for tenant in tenants], + "--runtime", + "10s", + *[f"{tenant}/{template_timeline}" for tenant in tenants], ] log.info(f"command: {' '.join(cmd)}") basepath = pg_bin.run_capture(cmd)