From fecb707b19f6f14942e9cbc624890a0e371bb931 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 11:41:58 +0200 Subject: [PATCH] pagebench: add `idle-streams` (#12583) ## Problem For the communicator scheduling policy, we need to understand the server-side cost of idle gRPC streams. Touches #11735. ## Summary of changes Add an `idle-streams` benchmark to `pagebench` which opens a large number of idle gRPC GetPage streams. --- pageserver/pagebench/src/cmd/idle_streams.rs | 127 +++++++++++++++++++ pageserver/pagebench/src/main.rs | 3 + 2 files changed, 130 insertions(+) create mode 100644 pageserver/pagebench/src/cmd/idle_streams.rs diff --git a/pageserver/pagebench/src/cmd/idle_streams.rs b/pageserver/pagebench/src/cmd/idle_streams.rs new file mode 100644 index 0000000000..73bc9f3f46 --- /dev/null +++ b/pageserver/pagebench/src/cmd/idle_streams.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use futures::StreamExt; +use tonic::transport::Endpoint; +use tracing::info; + +use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag}; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; +use utils::shard::ShardIndex; + +/// Starts a large number of idle gRPC GetPage streams. +#[derive(clap::Parser)] +pub(crate) struct Args { + /// The Pageserver to connect to. Must use grpc://. + #[clap(long, default_value = "grpc://localhost:51051")] + server: String, + /// The Pageserver HTTP API. + #[clap(long, default_value = "http://localhost:9898")] + http_server: String, + /// The number of streams to open. + #[clap(long, default_value = "100000")] + count: usize, + /// Number of streams per connection. + #[clap(long, default_value = "100")] + per_connection: usize, + /// Send a single GetPage request on each stream. + #[clap(long, default_value_t = false)] + send_request: bool, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + rt.block_on(main_impl(args)) +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + // Discover a tenant and timeline to use. + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), + args.http_server.clone(), + None, + )); + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: Some(1), + targets: None, + }, + ) + .await?; + let ttid = timelines + .first() + .ok_or_else(|| anyhow!("no timelines found"))?; + + // Set up the initial client. + let endpoint = Endpoint::from_shared(args.server.clone())?; + + let connect = async || { + pageserver_page_api::Client::new( + endpoint.connect().await?, + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + None, + ) + }; + + let mut client = connect().await?; + let mut streams = Vec::with_capacity(args.count); + + // Create streams. + for i in 0..args.count { + if i % 100 == 0 { + info!("opened {}/{} streams", i, args.count); + } + if i % args.per_connection == 0 && i > 0 { + client = connect().await?; + } + + let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel(); + let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); + let mut resp_stream = client.get_pages(req_stream).await?; + + // Send request if specified. + if args.send_request { + req_tx.send(GetPageRequest { + request_id: 1.into(), + request_class: GetPageClass::Normal, + read_lsn: ReadLsn { + request_lsn: Lsn::MAX, + not_modified_since_lsn: Some(Lsn(1)), + }, + rel: RelTag { + spcnode: 1664, // pg_global + dbnode: 0, // shared database + relnode: 1262, // pg_authid + forknum: 0, // init + }, + block_numbers: vec![0], + })?; + let resp = resp_stream + .next() + .await + .transpose()? + .ok_or_else(|| anyhow!("no response"))?; + if resp.status_code != GetPageStatusCode::Ok { + return Err(anyhow!("{} response", resp.status_code)); + } + } + + // Hold onto streams to avoid closing them. + streams.push((req_tx, resp_stream)); + } + + info!("opened {} streams, sleeping", args.count); + + // Block forever, to hold the idle streams open for inspection. + futures::future::pending::<()>().await; + + Ok(()) +} diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 5527557450..6498203de3 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -17,6 +17,7 @@ mod cmd { pub(super) mod aux_files; pub(super) mod basebackup; pub(super) mod getpage_latest_lsn; + pub(super) mod idle_streams; pub(super) mod ondemand_download_churn; pub(super) mod trigger_initial_size_calculation; } @@ -29,6 +30,7 @@ enum Args { TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args), OndemandDownloadChurn(cmd::ondemand_download_churn::Args), AuxFiles(cmd::aux_files::Args), + IdleStreams(cmd::idle_streams::Args), } fn main() { @@ -49,6 +51,7 @@ fn main() { } Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), Args::AuxFiles(args) => cmd::aux_files::main(args), + Args::IdleStreams(args) => cmd::idle_streams::main(args), } .unwrap() }