From 04440343f83351bf374d3b552d1d78a012277325 Mon Sep 17 00:00:00 2001 From: Elizabeth Murray Date: Wed, 28 May 2025 14:44:28 -0700 Subject: [PATCH] Pagebench with grpc option. Note that grpc is on port 51050, so requires a connstring to be set. --- Cargo.lock | 4 + pageserver/pagebench/Cargo.toml | 4 + .../pagebench/src/cmd/getpage_latest_lsn.rs | 276 +++++++++++++++--- 3 files changed, 244 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f71514507c..d1d765255b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4236,6 +4236,7 @@ name = "pagebench" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "clap", "futures", @@ -4244,12 +4245,15 @@ dependencies = [ "humantime-serde", "pageserver_api", "pageserver_client", + "pageserver_client_grpc", + "pageserver_page_api", "rand 0.8.5", "reqwest", "serde", "serde_json", "tokio", "tokio-util", + "tonic 0.13.1", "tracing", "utils", "workspace_hack", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5b5ed09a2b..099b08bc82 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -20,9 +20,13 @@ serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tonic.workspace = true tokio-util.workspace = true +async-trait = "0.1" pageserver_client.workspace = true pageserver_api.workspace = true +pageserver_client_grpc.workspace = true +pageserver_page_api.workspace = true utils = { path = "../../libs/utils/" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 50419ec338..b423e904e6 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -6,25 +6,40 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use tonic::metadata::AsciiMetadataValue; use anyhow::Context; use camino::Utf8PathBuf; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::shard::TenantShardId; +use pageserver_client::page_service::PagestreamClient; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; use utils::id::TenantTimelineId; +use utils::id::TenantId; +use utils::id::TimelineId; use utils::lsn::Lsn; +use futures::{ + future::BoxFuture, + stream::FuturesOrdered, + FutureExt, StreamExt, +}; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; +use async_trait::async_trait; +use rand::distributions::weighted::WeightedIndex; +use utils::shard::ShardIndex; + /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { + #[clap(long, default_value = "false")] + grpc: bool, #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, #[clap(long, default_value = "postgres://postgres@localhost:64000")] @@ -303,7 +318,19 @@ async fn main_impl( .unwrap(); Box::pin(async move { - client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await + if args.grpc { + let grpc = GrpcProtocol::new( + args.page_service_connstring.clone(), + worker_id.timeline.tenant_id, + worker_id.timeline.timeline_id).await; + client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await + } else { + let pg = PgProtocol::new( + args.page_service_connstring.clone(), + worker_id.timeline.tenant_id, + worker_id.timeline.timeline_id).await; + client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await + } }) }; @@ -354,9 +381,208 @@ async fn main_impl( anyhow::Ok(()) } +/// Common interface for both Pg and Grpc versions. +#[async_trait] +trait Protocol: Send { + /// Constructor/factory. + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self + where + Self: Sized; -async fn client_libpq( + /// Fire off a “get page” request and store the start time. + async fn add_to_inflight( + &mut self, + start: Instant, + args: &Args, + ranges: Vec, + weights: WeightedIndex, + ); + + /// Wait for the next response and return its start time. + async fn get_start_time(&mut self) -> Instant; + + /// How many in-flight requests do we have? + fn len(&self) -> usize; +} + +/////////////////////////////////////////////////////////////////////////////// +// PgProtocol +/////////////////////////////////////////////////////////////////////////////// + +struct PgProtocol { + libpq_pagestream: PagestreamClient, + libpq_vector: VecDeque, +} + +#[async_trait] +impl Protocol for PgProtocol { + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self { + let client = pageserver_client::page_service::Client::new(conn_string) + .await + .unwrap() + .pagestream(tenant_id, timeline_id) + .await + .unwrap(); + Self { + libpq_pagestream: client, + libpq_vector: VecDeque::new(), + } + } + + async fn add_to_inflight( + &mut self, + start: Instant, + args: &Args, + ranges: Vec, + weights: WeightedIndex, + ) { + // build your PagestreamGetPageRequest exactly as before… + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key.to_rel_block().unwrap(); + PagestreamGetPageRequest { + hdr: PagestreamRequest { + reqid: 0, + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since: r.timeline_lsn, + }, + rel: rel_tag, + blkno: block_no, + } + }; + + let _ = self.libpq_pagestream.getpage_send(req).await; + self.libpq_vector.push_back(start); + } + + async fn get_start_time(&mut self) -> Instant { + let start = self.libpq_vector.pop_front().unwrap(); + let _ = self.libpq_pagestream.getpage_recv().await; + start + } + + fn len(&self) -> usize { + self.libpq_vector.len() + } +} + +/////////////////////////////////////////////////////////////////////////////// +// GrpcProtocol +/////////////////////////////////////////////////////////////////////////////// +type GetPageFut = BoxFuture<'static, (Instant, Option)>; +struct GrpcProtocol { + grpc_page_client: Arc, + grpc_vector: FuturesOrdered, +} + +#[async_trait] +impl Protocol for GrpcProtocol { + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self { + let shard_map = std::collections::HashMap::from([( + ShardIndex::unsharded(), + conn_string.clone(), + )]); + let tenant_ascii : AsciiMetadataValue = tenant_id.to_string().parse().unwrap(); + let timeline_ascii : AsciiMetadataValue = timeline_id.to_string().parse().unwrap(); + let client = pageserver_client_grpc::PageserverClient::new( + tenant_ascii, + timeline_ascii, + None, + shard_map, + ).unwrap(); + Self { + grpc_page_client: Arc::new(client), + grpc_vector: FuturesOrdered::new(), + } + } + + async fn add_to_inflight( + &mut self, + start: Instant, + args: &Args, + ranges: Vec, + weights: WeightedIndex, + ) { + // build your GetPageRequest exactly as before… + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key.to_rel_block().unwrap(); + pageserver_page_api::GetPageRequest { + request_id: 0, + request_class: pageserver_page_api::GetPageClass::Normal, + read_lsn: pageserver_page_api::ReadLsn { + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since_lsn: Some(r.timeline_lsn), + }, + rel: pageserver_page_api::RelTag { + spcnode: rel_tag.spcnode, + dbnode: rel_tag.dbnode, + relnode: rel_tag.relnode, + forknum: rel_tag.forknum, + }, + block_numbers: vec![block_no].into(), + } + }; + + let client_clone = self.grpc_page_client.clone(); + let getpage_fut : GetPageFut = async move { + let result = client_clone.get_page(ShardIndex::unsharded(), req).await; + match result { + Ok(_) => { + (start, None) + } + Err(e) => { + (start, Some(e)) + } + } + }.boxed(); + self.grpc_vector.push_back(getpage_fut); + } + + async fn get_start_time(&mut self) -> Instant { + let (start, err) = self.grpc_vector.next().await.unwrap(); + if let Some(e) = err { + tracing::error!("getpage request failed: {e}"); + } + start + } + + fn len(&self) -> usize { + self.grpc_vector.len() + } +} + +async fn client_proto( args: &Args, + mut protocol: impl Protocol, worker_id: WorkerId, shared_state: Arc, cancel: CancellationToken, @@ -364,18 +590,11 @@ async fn client_libpq( ranges: Vec, weights: rand::distributions::weighted::WeightedIndex, ) { - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - let mut client = client - .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) - .await - .unwrap(); + shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; - let mut inflight = VecDeque::new(); while !cancel.is_cancelled() { // Detect if a request took longer than the RPS rate if let Some(period) = &rps_period { @@ -390,37 +609,12 @@ async fn client_libpq( ticks_processed = periods_passed_until_now; } - while inflight.len() < args.queue_depth.get() { + while protocol.len() < args.queue_depth.get() { let start = Instant::now(); - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - hdr: PagestreamRequest { - reqid: 0, - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since: r.timeline_lsn, - }, - rel: rel_tag, - blkno: block_no, - } - }; - client.getpage_send(req).await.unwrap(); - inflight.push_back(start); + protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await; } - let start = inflight.pop_front().unwrap(); - client.getpage_recv().await.unwrap(); + let start = protocol.get_start_time().await; let end = Instant::now(); shared_state.live_stats.request_done(); ticks_processed += 1; @@ -436,9 +630,11 @@ async fn client_libpq( if let Some(period) = &rps_period { let next_at = client_start + Duration::from_micros( - (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), - ); + (ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(), + ); tokio::time::sleep_until(next_at.into()).await; } } } + +