From 4d0c1e8b783df7c849cf5ac55dce39da10d51b93 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 19 Apr 2025 11:38:03 +0300 Subject: [PATCH] refactor: Extract some code in pagebench getpage command to function (#11563) This makes it easier to add a different client implementation alongside the current one. I started working on a new gRPC-based protocol to replace the libpq protocol, which will introduce a new function like `client_libpq`, but for the new protocol. It's a little more readable with less indentation anyway. --- .../pagebench/src/cmd/getpage_latest_lsn.rs | 200 ++++++++++-------- 1 file changed, 109 insertions(+), 91 deletions(-) diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 6fd1c00eca..771a7cbe5b 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -68,6 +68,13 @@ pub(crate) struct Args { targets: Option>, } +/// State shared by all clients +#[derive(Debug)] +struct SharedState { + start_work_barrier: tokio::sync::Barrier, + live_stats: LiveStats, +} + #[derive(Debug, Default)] struct LiveStats { completed_requests: AtomicU64, @@ -240,24 +247,26 @@ async fn main_impl( all_ranges }; - let live_stats = Arc::new(LiveStats::default()); - let num_live_stats_dump = 1; let num_work_sender_tasks = args.num_clients.get() * timelines.len(); let num_main_impl = 1; - let start_work_barrier = Arc::new(tokio::sync::Barrier::new( - num_live_stats_dump + num_work_sender_tasks + num_main_impl, - )); + let shared_state = Arc::new(SharedState { + start_work_barrier: tokio::sync::Barrier::new( + num_live_stats_dump + num_work_sender_tasks + num_main_impl, + ), + live_stats: LiveStats::default(), + }); + let cancel = CancellationToken::new(); + let ss = shared_state.clone(); tokio::spawn({ - let stats = Arc::clone(&live_stats); - let start_work_barrier = Arc::clone(&start_work_barrier); async move { - start_work_barrier.wait().await; + ss.start_work_barrier.wait().await; loop { let start = std::time::Instant::now(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let stats = &ss.live_stats; let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); let missed = stats.missed.swap(0, Ordering::Relaxed); let elapsed = start.elapsed(); @@ -270,14 +279,12 @@ async fn main_impl( } }); - let cancel = CancellationToken::new(); - let rps_period = args .per_client_rate .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64))); let make_worker: &dyn Fn(WorkerId) -> Pin>> = &|worker_id| { - let live_stats = live_stats.clone(); - let start_work_barrier = start_work_barrier.clone(); + let ss = shared_state.clone(); + let cancel = cancel.clone(); let ranges: Vec = all_ranges .iter() .filter(|r| r.timeline == worker_id.timeline) @@ -287,85 +294,8 @@ async fn main_impl( rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len())) .unwrap(); - let cancel = cancel.clone(); Box::pin(async move { - let client = - pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - let mut client = client - .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) - .await - .unwrap(); - - start_work_barrier.wait().await; - let client_start = Instant::now(); - let mut ticks_processed = 0; - let mut inflight = VecDeque::new(); - while !cancel.is_cancelled() { - // Detect if a request took longer than the RPS rate - if let Some(period) = &rps_period { - let periods_passed_until_now = - usize::try_from(client_start.elapsed().as_micros() / period.as_micros()) - .unwrap(); - - if periods_passed_until_now > ticks_processed { - live_stats.missed((periods_passed_until_now - ticks_processed) as u64); - } - ticks_processed = periods_passed_until_now; - } - - while inflight.len() < args.queue_depth.get() { - let start = Instant::now(); - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - hdr: PagestreamRequest { - reqid: 0, - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since: r.timeline_lsn, - }, - rel: rel_tag, - blkno: block_no, - } - }; - client.getpage_send(req).await.unwrap(); - inflight.push_back(start); - } - - let start = inflight.pop_front().unwrap(); - client.getpage_recv().await.unwrap(); - let end = Instant::now(); - live_stats.request_done(); - ticks_processed += 1; - STATS.with(|stats| { - stats - .borrow() - .lock() - .unwrap() - .observe(end.duration_since(start)) - .unwrap(); - }); - - if let Some(period) = &rps_period { - let next_at = client_start - + Duration::from_micros( - (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), - ); - tokio::time::sleep_until(next_at.into()).await; - } - } + client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await }) }; @@ -387,7 +317,7 @@ async fn main_impl( }; info!("waiting for everything to become ready"); - start_work_barrier.wait().await; + shared_state.start_work_barrier.wait().await; info!("work started"); if let Some(runtime) = args.runtime { tokio::time::sleep(runtime.into()).await; @@ -416,3 +346,91 @@ async fn main_impl( anyhow::Ok(()) } + +async fn client_libpq( + args: &Args, + worker_id: WorkerId, + shared_state: Arc, + cancel: CancellationToken, + rps_period: Option, + ranges: Vec, + weights: rand::distributions::weighted::WeightedIndex, +) { + let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) + .await + .unwrap(); + let mut client = client + .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) + .await + .unwrap(); + + shared_state.start_work_barrier.wait().await; + let client_start = Instant::now(); + let mut ticks_processed = 0; + let mut inflight = VecDeque::new(); + while !cancel.is_cancelled() { + // Detect if a request took longer than the RPS rate + if let Some(period) = &rps_period { + let periods_passed_until_now = + usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap(); + + if periods_passed_until_now > ticks_processed { + shared_state + .live_stats + .missed((periods_passed_until_now - ticks_processed) as u64); + } + ticks_processed = periods_passed_until_now; + } + + while inflight.len() < args.queue_depth.get() { + let start = Instant::now(); + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key + .to_rel_block() + .expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + hdr: PagestreamRequest { + reqid: 0, + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since: r.timeline_lsn, + }, + rel: rel_tag, + blkno: block_no, + } + }; + client.getpage_send(req).await.unwrap(); + inflight.push_back(start); + } + + let start = inflight.pop_front().unwrap(); + client.getpage_recv().await.unwrap(); + let end = Instant::now(); + shared_state.live_stats.request_done(); + ticks_processed += 1; + STATS.with(|stats| { + stats + .borrow() + .lock() + .unwrap() + .observe(end.duration_since(start)) + .unwrap(); + }); + + if let Some(period) = &rps_period { + let next_at = client_start + + Duration::from_micros( + (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), + ); + tokio::time::sleep_until(next_at.into()).await; + } + } +}