From df32cc153cd8dc4103b42bb99d17cdf31d855bad Mon Sep 17 00:00:00 2001 From: Elizabeth Murray Date: Wed, 28 May 2025 06:14:56 -0700 Subject: [PATCH] Add grpc pagebench for communicator grpc. --- Cargo.toml | 4 +- pageserver/page_api/src/lib.rs | 2 +- pageserver/page_api/src/model.rs | 63 +++++ pageserver/pagebench/Cargo.toml | 4 + .../pagebench/src/cmd/getpage_latest_lsn.rs | 251 +++++++++++++++--- 5 files changed, 283 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a040010fb7..4790497d8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "pageserver/compaction", "pageserver/ctl", "pageserver/client", + "pageserver/client_grpc", "pageserver/pagebench", "pageserver/page_api", "proxy", @@ -199,7 +200,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } +tonic = { version = "0.13.1", default-features = false, features = ["gzip", "channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } @@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } +pageserver_client_grpc = { path = "./pageserver/client_grpc" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } pageserver_page_api = { path = "./pageserver/page_api" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index f515f27f3e..1f656deb80 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -18,6 +18,6 @@ pub mod proto { pub use page_service_server::{PageService, PageServiceServer}; } -mod model; +pub mod model; pub use model::*; diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 7ab97a994e..8d2e218247 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -102,6 +102,15 @@ impl TryFrom for proto::ReadLsn { } } +impl From<&ReadLsn> for proto::ReadLsn { + fn from(value: &ReadLsn) -> proto::ReadLsn { + proto::ReadLsn { + request_lsn: value.request_lsn.into(), + not_modified_since_lsn: value.not_modified_since_lsn.unwrap_or_default().0, + } + } +} + // RelTag is defined in pageserver_api::reltag. pub type RelTag = pageserver_api::reltag::RelTag; @@ -132,6 +141,16 @@ impl From for proto::RelTag { } } +impl From<&RelTag> for proto::RelTag { + fn from(value: &RelTag) -> proto::RelTag { + proto::RelTag { + spc_oid: value.spcnode, + db_oid: value.dbnode, + rel_number: value.relnode, + fork_number: value.forknum as u32, + } + } +} /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error. #[derive(Clone, Copy, Debug)] pub struct CheckRelExistsRequest { @@ -153,6 +172,14 @@ impl TryFrom for CheckRelExistsRequest { } } +impl From<&CheckRelExistsRequest> for proto::CheckRelExistsRequest { + fn from(value: &CheckRelExistsRequest) -> proto::CheckRelExistsRequest { + proto::CheckRelExistsRequest { + read_lsn: Some((&value.read_lsn).into()), + rel: Some((&value.rel).into()), + } + } +} pub type CheckRelExistsResponse = bool; impl From for CheckRelExistsResponse { @@ -190,6 +217,15 @@ impl TryFrom for GetBaseBackupRequest { } } +impl From<&GetBaseBackupRequest> for proto::GetBaseBackupRequest { + fn from(value: &GetBaseBackupRequest) -> proto::GetBaseBackupRequest { + proto::GetBaseBackupRequest { + read_lsn: Some((&value.read_lsn).into()), + replica: value.replica, + } + } +} + impl TryFrom for proto::GetBaseBackupRequest { type Error = ProtocolError; @@ -246,6 +282,14 @@ impl TryFrom for GetDbSizeRequest { } } +impl From<&GetDbSizeRequest> for proto::GetDbSizeRequest { + fn from(value: &GetDbSizeRequest) -> proto::GetDbSizeRequest { + proto::GetDbSizeRequest { + read_lsn: Some((&value.read_lsn).into()), + db_oid: value.db_oid, + } + } +} impl TryFrom for proto::GetDbSizeRequest { type Error = ProtocolError; @@ -311,6 +355,17 @@ impl TryFrom for GetPageRequest { } } +impl From<&GetPageRequest> for proto::GetPageRequest { + fn from(request: &GetPageRequest) -> proto::GetPageRequest { + proto::GetPageRequest { + request_id: request.request_id, + request_class: request.request_class.into(), + read_lsn: Some(request.read_lsn.try_into().unwrap()), + rel: Some(request.rel.into()), + block_number: request.block_numbers.clone().into_vec(), + } + } +} impl TryFrom for proto::GetPageRequest { type Error = ProtocolError; @@ -505,6 +560,14 @@ impl TryFrom for GetRelSizeRequest { } } +impl From<&GetRelSizeRequest> for proto::GetRelSizeRequest { + fn from(value: &GetRelSizeRequest) -> proto::GetRelSizeRequest { + proto::GetRelSizeRequest { + read_lsn: Some((&value.read_lsn).into()), + rel: Some((&value.rel).into()), + } + } +} impl TryFrom for proto::GetRelSizeRequest { type Error = ProtocolError; diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5b5ed09a2b..00add30363 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -10,12 +10,14 @@ license.workspace = true anyhow.workspace = true camino.workspace = true clap.workspace = true +thiserror.workspace = true futures.workspace = true hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true rand.workspace = true reqwest.workspace=true +bytes.workspace = true serde.workspace = true serde_json.workspace = true tracing.workspace = true @@ -23,6 +25,8 @@ tokio.workspace = true tokio-util.workspace = true pageserver_client.workspace = true +pageserver_client_grpc.workspace = true pageserver_api.workspace = true +pageserver_page_api.workspace = true utils = { path = "../../libs/utils/" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 50419ec338..c13bf31975 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -1,4 +1,4 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::future::Future; use std::num::NonZeroUsize; use std::pin::Pin; @@ -11,20 +11,32 @@ use camino::Utf8PathBuf; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; +use pageserver_page_api::model::{GetPageClass}; +use pageserver_client::page_service::PagestreamClient; use pageserver_api::shard::TenantShardId; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; use utils::id::TenantTimelineId; +use utils::id::TenantId; +use utils::id::TimelineId; use utils::lsn::Lsn; + + +use utils::shard::ShardIndex; +use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, StreamExt}; + use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { + + #[clap(long, default_value = "false")] + grpc: bool, #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, #[clap(long, default_value = "postgres://postgres@localhost:64000")] @@ -118,6 +130,24 @@ struct Output { total: request_stats::Output, } + +enum ProtocolType { + Pg(PgProtocol), + Grpc(GrpcProtocol), +} + +struct PgProtocol { + libpq_pagestream: PagestreamClient, + libpq_vector: VecDeque, +} +type GetPageFut = BoxFuture<'static, (Instant, Option)>; + +struct GrpcProtocol { + // mutex + grpc_page_client : Arc, + grpc_vector: FuturesOrdered, +} + tokio_thread_local_stats::declare!(STATS: request_stats::Stats); pub(crate) fn main(args: Args) -> anyhow::Result<()> { @@ -303,7 +333,22 @@ async fn main_impl( .unwrap(); Box::pin(async move { - client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await + let protocol : ProtocolType; + if args.grpc { + let grpc = GrpcProtocol::new( + args.page_service_connstring.clone(), + worker_id.timeline.tenant_id, + worker_id.timeline.timeline_id).await; + protocol = ProtocolType::Grpc(grpc); + } else { + let pg = PgProtocol::new( + args.page_service_connstring.clone(), + worker_id.timeline.tenant_id, + worker_id.timeline.timeline_id).await; + protocol = ProtocolType::Pg(pg); + } + + client_proto(args, protocol, worker_id, ss, cancel, rps_period, ranges, weights).await }) }; @@ -355,8 +400,168 @@ async fn main_impl( anyhow::Ok(()) } -async fn client_libpq( +impl PgProtocol { + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self { + let client = pageserver_client::page_service::Client::new(conn_string) + .await + .unwrap(); + let client = client + .pagestream(tenant_id, timeline_id) + .await + .unwrap(); + Self { + libpq_pagestream: client, + libpq_vector: VecDeque::new(), + } + } +} + +impl GrpcProtocol { + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self { + let shard_map = HashMap::from([( + ShardIndex::unsharded(), + conn_string.clone(), + )]); + let client = pageserver_client_grpc::PageserverClient::new( + &tenant_id.to_string(), + &timeline_id.to_string(), + &None, + shard_map, + ); + Self { + grpc_page_client: Arc::new(client), + grpc_vector: FuturesOrdered::new(), + } + } +} +impl ProtocolType { + + async fn add_to_inflight( + &mut self, + start: Instant, + args: &Args, + ranges: Vec, + weights: rand::distributions::weighted::WeightedIndex, + ) -> () { + match self { + ProtocolType::Grpc(g) => { + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key + .to_rel_block() + .expect("we filter non-rel-block keys out above"); + pageserver_page_api::model::GetPageRequest { + request_id: 0, // TODO + request_class: GetPageClass::Normal, + read_lsn: pageserver_page_api::model::ReadLsn { + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since_lsn: Some(r.timeline_lsn), + }, + rel: pageserver_page_api::model::RelTag { + spcnode: rel_tag.spcnode, + dbnode: rel_tag.dbnode, + relnode: rel_tag.relnode, + forknum: rel_tag.forknum, + }, + block_numbers: vec![block_no].into(), + } + }; + let client_clone = g.grpc_page_client.clone(); + let getpage_fut : GetPageFut = async move { + let result = client_clone.get_page(&req).await; + match result { + Ok(_) => { + (start, None) + } + Err(e) => { + (start, Some(e)) + } + } + }.boxed(); + g.grpc_vector.push_back(getpage_fut); + + } + ProtocolType::Pg(p) => { + + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key + .to_rel_block() + .expect("we filter non-rel-block keys out above"); + PagestreamGetPageRequest { + hdr: PagestreamRequest { + reqid: 0, + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since: r.timeline_lsn, + }, + rel: rel_tag, + blkno: block_no, + } + }; + let _ = p.libpq_pagestream.getpage_send(req).await; + p.libpq_vector.push_back(start); + } + } + } + + async fn get_start_time(&mut self) -> Instant { + match self { + ProtocolType::Grpc(g) => { + // Logic to get start time for grpc + let (start, result) = g.grpc_vector.next().await.unwrap(); + match result { + None => { + // Request succeeded + } + Some(e) => { + tracing::error!("getpage request failed: {e}"); + } + } + + start + } + ProtocolType::Pg(p) => { + // Logic to get start time for pgstream + let start = p.libpq_vector.pop_front().unwrap(); + let _ = p.libpq_pagestream.getpage_recv().await; + start + } + } + } + pub fn len(&self) -> usize { + match self { + ProtocolType::Grpc(g) => g.grpc_vector.len(), + ProtocolType::Pg(p) => p.libpq_vector.len(), + } + } +} +async fn client_proto( args: &Args, + mut protocol: ProtocolType, worker_id: WorkerId, shared_state: Arc, cancel: CancellationToken, @@ -364,18 +569,11 @@ async fn client_libpq( ranges: Vec, weights: rand::distributions::weighted::WeightedIndex, ) { - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - let mut client = client - .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) - .await - .unwrap(); + shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; - let mut inflight = VecDeque::new(); while !cancel.is_cancelled() { // Detect if a request took longer than the RPS rate if let Some(period) = &rps_period { @@ -390,37 +588,12 @@ async fn client_libpq( ticks_processed = periods_passed_until_now; } - while inflight.len() < args.queue_depth.get() { + while protocol.len() < args.queue_depth.get() { let start = Instant::now(); - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - hdr: PagestreamRequest { - reqid: 0, - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since: r.timeline_lsn, - }, - rel: rel_tag, - blkno: block_no, - } - }; - client.getpage_send(req).await.unwrap(); - inflight.push_back(start); + protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await; } - let start = inflight.pop_front().unwrap(); - client.getpage_recv().await.unwrap(); + let start = protocol.get_start_time().await; let end = Instant::now(); shared_state.live_stats.request_done(); ticks_processed += 1;