From dcdfe80bf015e93b991c0aa86ffbbffbcd18c198 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 10 Jul 2025 19:30:09 +0200 Subject: [PATCH] pagebench: add support for rich gRPC client (#12477) ## Problem We need to benchmark the rich gRPC client `client_grpc::PageserverClient` against the basic, no-frills `page_api::Client` to determine how much overhead it adds. Touches #11735. Requires #12476. ## Summary of changes Add a `pagebench --rich-client` parameter to use `client_grpc::PageserverClient`. Also adds a compression parameter to the client. --- Cargo.lock | 1 + Cargo.toml | 1 + pageserver/client_grpc/src/client.rs | 28 ++++++- pageserver/client_grpc/src/pool.rs | 7 +- pageserver/pagebench/Cargo.toml | 3 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 84 +++++++++++++++++++ 6 files changed, 120 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d68b8f862..c528354053 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4294,6 +4294,7 @@ dependencies = [ "humantime-serde", "pageserver_api", "pageserver_client", + "pageserver_client_grpc", "pageserver_page_api", "rand 0.8.5", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 14f2cfcb56..0d521ee4d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,6 +262,7 @@ neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } pageserver_client = { path = "./pageserver/client" } +pageserver_client_grpc = { path = "./pageserver/client_grpc" } pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" } pageserver_page_api = { path = "./pageserver/page_api" } postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index ee09c1f13c..e790f4018e 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -6,6 +6,7 @@ use anyhow::anyhow; use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; +use tonic::codec::CompressionEncoding; use tracing::instrument; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; @@ -62,6 +63,8 @@ pub struct PageserverClient { timeline_id: TimelineId, /// The JWT auth token for this tenant, if any. auth_token: Option, + /// The compression to use, if any. + compression: Option, /// The shards for this tenant. shards: ArcSwap, /// The retry configuration. @@ -76,12 +79,20 @@ impl PageserverClient { timeline_id: TimelineId, shard_spec: ShardSpec, auth_token: Option, + compression: Option, ) -> anyhow::Result { - let shards = Shards::new(tenant_id, timeline_id, shard_spec, auth_token.clone())?; + let shards = Shards::new( + tenant_id, + timeline_id, + shard_spec, + auth_token.clone(), + compression, + )?; Ok(Self { tenant_id, timeline_id, auth_token, + compression, shards: ArcSwap::new(Arc::new(shards)), retry: Retry, }) @@ -119,6 +130,7 @@ impl PageserverClient { self.timeline_id, shard_spec, self.auth_token.clone(), + self.compression, )?; self.shards.store(Arc::new(shards)); Ok(()) @@ -364,13 +376,21 @@ impl Shards { timeline_id: TimelineId, shard_spec: ShardSpec, auth_token: Option, + compression: Option, ) -> anyhow::Result { // NB: the shard spec has already been validated when constructed. let mut shards = HashMap::with_capacity(shard_spec.urls.len()); for (shard_id, url) in shard_spec.urls { shards.insert( shard_id, - Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?, + Shard::new( + url, + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + compression, + )?, ); } @@ -422,6 +442,7 @@ impl Shard { timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + compression: Option, ) -> anyhow::Result { // Common channel pool for unary and stream requests. Bounded by client/stream pools. let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; @@ -433,6 +454,7 @@ impl Shard { timeline_id, shard_id, auth_token.clone(), + compression, Some(MAX_UNARY_CLIENTS), ); @@ -445,6 +467,7 @@ impl Shard { timeline_id, shard_id, auth_token.clone(), + compression, None, // unbounded, limited by stream pool ), Some(MAX_STREAMS), @@ -460,6 +483,7 @@ impl Shard { timeline_id, shard_id, auth_token, + compression, None, // unbounded, limited by stream pool ), Some(MAX_BULK_STREAMS), diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 89b3bd646f..2dde40b5b4 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -40,6 +40,7 @@ use futures::StreamExt as _; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tokio_util::sync::CancellationToken; +use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint}; use tracing::{error, warn}; @@ -242,6 +243,8 @@ pub struct ClientPool { shard_id: ShardIndex, /// Authentication token, if any. auth_token: Option, + /// Compression to use. + compression: Option, /// Channel pool to acquire channels from. channel_pool: Arc, /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. @@ -281,6 +284,7 @@ impl ClientPool { timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + compression: Option, max_clients: Option>, ) -> Arc { let pool = Arc::new(Self { @@ -288,6 +292,7 @@ impl ClientPool { timeline_id, shard_id, auth_token, + compression, channel_pool, idle: Mutex::default(), idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), @@ -331,7 +336,7 @@ impl ClientPool { self.timeline_id, self.shard_id, self.auth_token.clone(), - None, + self.compression, )?; Ok(ClientGuard { diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index f5dfc0db25..4086213830 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -27,8 +27,9 @@ tokio-util.workspace = true tonic.workspace = true url.workspace = true -pageserver_client.workspace = true pageserver_api.workspace = true +pageserver_client.workspace = true +pageserver_client_grpc.workspace = true pageserver_page_api.workspace = true utils = { path = "../../libs/utils/" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index f14caf548c..42c7e40489 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -10,12 +10,14 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use camino::Utf8PathBuf; +use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt as _}; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; +use pageserver_client_grpc::{self as client_grpc, ShardSpec}; use pageserver_page_api as page_api; use rand::prelude::*; use tokio::task::JoinSet; @@ -37,6 +39,10 @@ pub(crate) struct Args { /// Pageserver connection string. Supports postgresql:// and grpc:// protocols. #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, + /// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic + /// no-frills `page_api::Client`. Only valid with grpc:// connstrings. + #[clap(long)] + rich_client: bool, #[clap(long)] pageserver_jwt: Option, #[clap(long, default_value = "1")] @@ -332,6 +338,7 @@ async fn main_impl( let client: Box = match scheme.as_str() { "postgresql" | "postgres" => { assert!(!args.compression, "libpq does not support compression"); + assert!(!args.rich_client, "rich client requires grpc://"); Box::new( LibpqClient::new(&args.page_service_connstring, worker_id.timeline) .await @@ -339,6 +346,16 @@ async fn main_impl( ) } + "grpc" if args.rich_client => Box::new( + RichGrpcClient::new( + &args.page_service_connstring, + worker_id.timeline, + args.compression, + ) + .await + .unwrap(), + ), + "grpc" => Box::new( GrpcClient::new( &args.page_service_connstring, @@ -680,3 +697,70 @@ impl Client for GrpcClient { Ok((resp.request_id, resp.page_images)) } } + +/// A rich gRPC Pageserver client. +struct RichGrpcClient { + inner: Arc, + requests: FuturesUnordered< + Pin> + Send>>, + >, +} + +impl RichGrpcClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let inner = Arc::new(client_grpc::PageserverClient::new( + ttid.tenant_id, + ttid.timeline_id, + ShardSpec::new( + [(ShardIndex::unsharded(), connstring.to_string())].into(), + None, + )?, + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + )?); + Ok(Self { + inner, + requests: FuturesUnordered::new(), + }) + } +} + +#[async_trait] +impl Client for RichGrpcClient { + async fn send_get_page( + &mut self, + req_id: u64, + req_lsn: Lsn, + mod_lsn: Lsn, + rel: RelTag, + blks: Vec, + ) -> anyhow::Result<()> { + let req = page_api::GetPageRequest { + request_id: req_id, + request_class: page_api::GetPageClass::Normal, + read_lsn: page_api::ReadLsn { + request_lsn: req_lsn, + not_modified_since_lsn: Some(mod_lsn), + }, + rel, + block_numbers: blks, + }; + let inner = self.inner.clone(); + self.requests.push(Box::pin(async move { + inner + .get_page(req) + .await + .map_err(|err| anyhow::anyhow!("{err}")) + })); + Ok(()) + } + + async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { + let resp = self.requests.next().await.unwrap()?; + Ok((resp.request_id, resp.page_images)) + } +}