From 15d079cd41c579555b7a42b7d36429ca63c7d8a2 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 20 Jun 2025 10:31:40 +0200 Subject: [PATCH] pagebench: improve `getpage-latest-lsn` gRPC support (#12293) This improves `pagebench getpage-latest-lsn` gRPC support by: * Using `page_api::Client`. * Removing `--protocol`, and using the `page-server-connstring` scheme instead. * Adding `--compression` to enable zstd compression. --- Cargo.lock | 3 + pageserver/page_api/Cargo.toml | 2 + pageserver/page_api/src/model.rs | 2 +- pageserver/pagebench/Cargo.toml | 1 + pageserver/pagebench/src/cmd/basebackup.rs | 10 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 115 ++++++++++-------- 6 files changed, 79 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ab26b02fa..8cc51350ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4255,6 +4255,7 @@ dependencies = [ "tokio-util", "tonic 0.13.1", "tracing", + "url", "utils", "workspace_hack", ] @@ -4472,6 +4473,8 @@ dependencies = [ "pageserver_api", "postgres_ffi", "prost 0.13.5", + "strum", + "strum_macros", "thiserror 1.0.69", "tokio", "tonic 0.13.1", diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index 8b13b9e1db..c5283c2b09 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -11,6 +11,8 @@ futures.workspace = true pageserver_api.workspace = true postgres_ffi.workspace = true prost.workspace = true +strum.workspace = true +strum_macros.workspace = true thiserror.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index ef7f89473f..6efa742799 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -459,7 +459,7 @@ impl GetPageResponse { /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream /// (potentially shared by many backends), and a gRPC status response would terminate the stream so /// we send GetPageResponse messages with these codes instead. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)] pub enum GetPageStatusCode { /// Unknown status. For forwards compatibility: used when an older client version receives a new /// status code from a newer server version. diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5e4af88e69..f5dfc0db25 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -25,6 +25,7 @@ tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true tonic.workspace = true +url.workspace = true pageserver_client.workspace = true pageserver_api.workspace = true diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 8015db528d..e028174c1d 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -13,7 +13,6 @@ use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; use pageserver_page_api as page_api; use rand::prelude::*; -use reqwest::Url; use tokio::io::AsyncRead; use tokio::sync::Barrier; use tokio::task::JoinSet; @@ -21,6 +20,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt use tokio_util::io::StreamReader; use tonic::async_trait; use tracing::{info, instrument}; +use url::Url; use utils::id::TenantTimelineId; use utils::lsn::Lsn; use utils::shard::ShardIndex; @@ -156,12 +156,16 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - let connurl = Url::parse(&args.page_service_connstring)?; + let scheme = match Url::parse(&args.page_service_connstring) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => return Err(anyhow!("invalid connstring: {err}")), + }; for &tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); - let client: Box = match connurl.scheme() { + let client: Box = match scheme.as_str() { "postgresql" | "postgres" => Box::new( LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, ), diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 3a68a77279..a297819e9b 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -10,33 +10,31 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use camino::Utf8PathBuf; +use futures::{Stream, StreamExt as _}; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; -use pageserver_page_api::proto; +use pageserver_page_api as page_api; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; +use url::Url; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; -#[derive(clap::ValueEnum, Clone, Debug)] -enum Protocol { - Libpq, - Grpc, -} - /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, + /// Pageserver connection string. Supports postgresql:// and grpc:// protocols. #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, #[clap(long)] @@ -45,8 +43,9 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, - #[clap(long, value_enum, default_value = "libpq")] - protocol: Protocol, + /// If true, enable compression (only for gRPC). + #[clap(long)] + compression: bool, /// Each client sends requests at the given rate. /// /// If a request takes too long and we should be issuing a new request already, @@ -325,18 +324,32 @@ async fn main_impl( .unwrap(); Box::pin(async move { - let client: Box = match args.protocol { - Protocol::Libpq => Box::new( - LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline) - .await - .unwrap(), + let scheme = match Url::parse(&args.page_service_connstring) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => panic!("invalid connstring: {err}"), + }; + let client: Box = match scheme.as_str() { + "postgresql" | "postgres" => { + assert!(!args.compression, "libpq does not support compression"); + Box::new( + LibpqClient::new(&args.page_service_connstring, worker_id.timeline) + .await + .unwrap(), + ) + } + + "grpc" => Box::new( + GrpcClient::new( + &args.page_service_connstring, + worker_id.timeline, + args.compression, + ) + .await + .unwrap(), ), - Protocol::Grpc => Box::new( - GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline) - .await - .unwrap(), - ), + scheme => panic!("unsupported scheme {scheme}"), }; run_worker(args, client, ss, cancel, rps_period, ranges, weights).await }) @@ -543,8 +556,8 @@ struct LibpqClient { } impl LibpqClient { - async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { - let inner = pageserver_client::page_service::Client::new(connstring) + async fn new(connstring: &str, ttid: TenantTimelineId) -> anyhow::Result { + let inner = pageserver_client::page_service::Client::new(connstring.to_string()) .await? .pagestream(ttid.tenant_id, ttid.timeline_id) .await?; @@ -600,34 +613,36 @@ impl Client for LibpqClient { } } -/// A gRPC client using the raw, no-frills gRPC client. +/// A gRPC Pageserver client. struct GrpcClient { - req_tx: tokio::sync::mpsc::Sender, - resp_rx: tonic::Streaming, + req_tx: tokio::sync::mpsc::Sender, + resp_rx: Pin> + Send>>, } impl GrpcClient { - async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { - let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?; + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let mut client = page_api::Client::new( + connstring.to_string(), + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + ) + .await?; // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are // buffered by Tonic and the OS too. let (req_tx, req_rx) = tokio::sync::mpsc::channel(1); let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); - let mut req = tonic::Request::new(req_stream); - let metadata = req.metadata_mut(); - metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?); - metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?); - metadata.insert("neon-shard-id", "0000".try_into()?); + let resp_rx = Box::pin(client.get_pages(req_stream).await?); - let resp = client.get_pages(req).await?; - let resp_stream = resp.into_inner(); - - Ok(Self { - req_tx, - resp_rx: resp_stream, - }) + Ok(Self { req_tx, resp_rx }) } } @@ -641,27 +656,27 @@ impl Client for GrpcClient { rel: RelTag, blks: Vec, ) -> anyhow::Result<()> { - let req = proto::GetPageRequest { + let req = page_api::GetPageRequest { request_id: req_id, - request_class: proto::GetPageClass::Normal as i32, - read_lsn: Some(proto::ReadLsn { - request_lsn: req_lsn.0, - not_modified_since_lsn: mod_lsn.0, - }), - rel: Some(rel.into()), - block_number: blks, + request_class: page_api::GetPageClass::Normal, + read_lsn: page_api::ReadLsn { + request_lsn: req_lsn, + not_modified_since_lsn: Some(mod_lsn), + }, + rel, + block_numbers: blks, }; self.req_tx.send(req).await?; Ok(()) } async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { - let resp = self.resp_rx.message().await?.unwrap(); + let resp = self.resp_rx.next().await.unwrap().unwrap(); anyhow::ensure!( - resp.status_code == proto::GetPageStatusCode::Ok as i32, + resp.status_code == page_api::GetPageStatusCode::Ok, "unexpected status code: {}", - resp.status_code + resp.status_code, ); - Ok((resp.request_id, resp.page_image)) + Ok((resp.request_id, resp.page_images)) } }