From a21c1174edefdfb59fbdce9ae5696c446a3cfe0a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 2 Jun 2025 16:50:49 +0200 Subject: [PATCH] pagebench: add gRPC support for `get-page-latest-lsn` (#12077) ## Problem We need gRPC support in Pagebench to benchmark the new gRPC Pageserver implementation. Touches #11728. ## Summary of changes Adds a `Client` trait to make the client transport swappable, and a gRPC client via a `--protocol grpc` parameter. This must also specify the connstring with the gRPC port: ``` pagebench get-page-latest-lsn --protocol grpc --page-service-connstring grpc://localhost:51051 ``` The client is implemented using the raw Tonic-generated gRPC client, to minimize client overhead. --- Cargo.lock | 4 + libs/pageserver_api/src/models.rs | 4 +- libs/pageserver_api/src/reltag.rs | 2 +- pageserver/pagebench/Cargo.toml | 6 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 146 ++++++++++++++++-- 5 files changed, 144 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f7378e95d..9fc233e5ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4236,6 +4236,7 @@ name = "pagebench" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "clap", "futures", @@ -4244,12 +4245,15 @@ dependencies = [ "humantime-serde", "pageserver_api", "pageserver_client", + "pageserver_page_api", "rand 0.8.5", "reqwest", "serde", "serde_json", "tokio", + "tokio-stream", "tokio-util", + "tonic 0.13.1", "tracing", "utils", "workspace_hack", diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e7d612bb7a..01487c0f57 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2045,7 +2045,7 @@ pub enum PagestreamProtocolVersion { pub type RequestId = u64; -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct PagestreamRequest { pub reqid: RequestId, pub request_lsn: Lsn, @@ -2064,7 +2064,7 @@ pub struct PagestreamNblocksRequest { pub rel: RelTag, } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct PagestreamGetPageRequest { pub hdr: PagestreamRequest, pub rel: RelTag, diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 473a44dbf9..4509cab2e0 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; // FIXME: should move 'forknum' as last field to keep this consistent with Postgres. // Then we could replace the custom Ord and PartialOrd implementations below with // deriving them. This will require changes in walredoproc.c. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, pub spcnode: Oid, diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5b5ed09a2b..ceb1278eab 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true clap.workspace = true futures.workspace = true @@ -15,14 +16,17 @@ hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true rand.workspace = true -reqwest.workspace=true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true +tonic.workspace = true pageserver_client.workspace = true pageserver_api.workspace = true +pageserver_page_api.workspace = true utils = { path = "../../libs/utils/" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 50419ec338..395e9cac41 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use anyhow::Context; +use async_trait::async_trait; use camino::Utf8PathBuf; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; -use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; +use pageserver_api::models::{ + PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest, +}; use pageserver_api::shard::TenantShardId; +use pageserver_page_api::proto; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -22,6 +26,12 @@ use utils::lsn::Lsn; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; +#[derive(clap::ValueEnum, Clone, Debug)] +enum Protocol { + Libpq, + Grpc, +} + /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { @@ -35,6 +45,8 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, + #[clap(long, value_enum, default_value = "libpq")] + protocol: Protocol, /// Each client sends requests at the given rate. /// /// If a request takes too long and we should be issuing a new request already, @@ -303,7 +315,20 @@ async fn main_impl( .unwrap(); Box::pin(async move { - client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await + let client: Box = match args.protocol { + Protocol::Libpq => Box::new( + LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline) + .await + .unwrap(), + ), + + Protocol::Grpc => Box::new( + GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline) + .await + .unwrap(), + ), + }; + run_worker(args, client, ss, cancel, rps_period, ranges, weights).await }) }; @@ -355,23 +380,15 @@ async fn main_impl( anyhow::Ok(()) } -async fn client_libpq( +async fn run_worker( args: &Args, - worker_id: WorkerId, + mut client: Box, shared_state: Arc, cancel: CancellationToken, rps_period: Option, ranges: Vec, weights: rand::distributions::weighted::WeightedIndex, ) { - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - let mut client = client - .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) - .await - .unwrap(); - shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; @@ -415,12 +432,12 @@ async fn client_libpq( blkno: block_no, } }; - client.getpage_send(req).await.unwrap(); + client.send_get_page(req).await.unwrap(); inflight.push_back(start); } let start = inflight.pop_front().unwrap(); - client.getpage_recv().await.unwrap(); + client.recv_get_page().await.unwrap(); let end = Instant::now(); shared_state.live_stats.request_done(); ticks_processed += 1; @@ -442,3 +459,104 @@ async fn client_libpq( } } } + +/// A benchmark client, to allow switching out the transport protocol. +/// +/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could +/// return a future that resolves when the response is received, but we don't really need it. +#[async_trait] +trait Client: Send { + /// Sends an asynchronous GetPage request to the pageserver. + async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>; + + /// Receives the next GetPage response from the pageserver. + async fn recv_get_page(&mut self) -> anyhow::Result; +} + +/// A libpq-based Pageserver client. +struct LibpqClient { + inner: pageserver_client::page_service::PagestreamClient, +} + +impl LibpqClient { + async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { + let inner = pageserver_client::page_service::Client::new(connstring) + .await? + .pagestream(ttid.tenant_id, ttid.timeline_id) + .await?; + Ok(Self { inner }) + } +} + +#[async_trait] +impl Client for LibpqClient { + async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + self.inner.getpage_send(req).await + } + + async fn recv_get_page(&mut self) -> anyhow::Result { + self.inner.getpage_recv().await + } +} + +/// A gRPC client using the raw, no-frills gRPC client. +struct GrpcClient { + req_tx: tokio::sync::mpsc::Sender, + resp_rx: tonic::Streaming, +} + +impl GrpcClient { + async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { + let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?; + + // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the + // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are + // buffered by Tonic and the OS too. + let (req_tx, req_rx) = tokio::sync::mpsc::channel(1); + let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); + let mut req = tonic::Request::new(req_stream); + let metadata = req.metadata_mut(); + metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?); + metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?); + metadata.insert("neon-shard-id", "0000".try_into()?); + + let resp = client.get_pages(req).await?; + let resp_stream = resp.into_inner(); + + Ok(Self { + req_tx, + resp_rx: resp_stream, + }) + } +} + +#[async_trait] +impl Client for GrpcClient { + async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + let req = proto::GetPageRequest { + request_id: 0, + request_class: proto::GetPageClass::Normal as i32, + read_lsn: Some(proto::ReadLsn { + request_lsn: req.hdr.request_lsn.0, + not_modified_since_lsn: req.hdr.not_modified_since.0, + }), + rel: Some(req.rel.into()), + block_number: vec![req.blkno], + }; + self.req_tx.send(req).await?; + Ok(()) + } + + async fn recv_get_page(&mut self) -> anyhow::Result { + let resp = self.resp_rx.message().await?.unwrap(); + anyhow::ensure!( + resp.status_code == proto::GetPageStatusCode::Ok as i32, + "unexpected status code: {}", + resp.status_code + ); + Ok(PagestreamGetPageResponse { + page: resp.page_image[0].clone(), + req: PagestreamGetPageRequest::default(), // dummy + }) + } +}