From a5fe67f3616b55135fa3a58c2db89bf30a9eb955 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Sun, 13 Jul 2025 19:27:39 +0200 Subject: [PATCH 01/11] proxy: cancel maintain_cancel_key task immediately (#12586) ## Problem When a connection terminates its maintain_cancel_key task keeps running until the CANCEL_KEY_REFRESH sleep finishes and then it triggers another cancel key TTL refresh before exiting. ## Summary of changes * Check for cancellation while sleeping and interrupt sleep. * If cancelled, break the loop, don't send a refresh cmd. --- proxy/src/cancellation.rs | 10 ++++++++-- proxy/src/util.rs | 14 +++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 4ea4c4ea54..03be9dd4cf 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -28,6 +28,7 @@ use crate::pqproto::CancelKeyData; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError}; +use crate::util::run_until; type IpSubnetKey = IpNet; @@ -498,8 +499,13 @@ impl Session { "registered cancellation key" ); - // wait before continuing. - tokio::time::sleep(CANCEL_KEY_REFRESH).await; + // wait before continuing. break immediately if cancelled. + if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + .await + .is_err() + { + break; + } } // retry immediately. Err(BatchQueueError::Result(error)) => { diff --git a/proxy/src/util.rs b/proxy/src/util.rs index 7fc2d9fbdb..0291216d94 100644 --- a/proxy/src/util.rs +++ b/proxy/src/util.rs @@ -7,8 +7,16 @@ pub async fn run_until_cancelled( f: F, cancellation_token: &CancellationToken, ) -> Option { - match select(pin!(f), pin!(cancellation_token.cancelled())).await { - Either::Left((f, _)) => Some(f), - Either::Right(((), _)) => None, + run_until(f, cancellation_token.cancelled()).await.ok() +} + +/// Runs the future `f` unless interrupted by future `condition`. +pub async fn run_until( + f: F1, + condition: F2, +) -> Result { + match select(pin!(f), pin!(condition)).await { + Either::Left((f1, _)) => Ok(f1), + Either::Right((f2, _)) => Err(f2), } } From 296c9190b2f6e12c571a2b71f070b1c5597738e8 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Mon, 14 Jul 2025 00:49:23 +0200 Subject: [PATCH 02/11] proxy: Use EXPIRE command to refresh cancel entries (#12580) ## Problem When refreshing cancellation data we resend the entire value again just to reset the TTL, which causes unnecessary load in proxy, on network and possibly on redis side. ## Summary of changes * Switch from using SET with full value to using EXPIRE to reset TTL. * Add a tiny delay between retries to prevent busy loop. * Shorten CancelKeyOp variants: drop redundant suffix. * Retry SET when EXPIRE failed. --- proxy/src/cancellation.rs | 130 +++++++++++++++++++++++++++----------- proxy/src/metrics.rs | 1 + 2 files changed, 95 insertions(+), 36 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 03be9dd4cf..77062d3bb4 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -32,20 +32,24 @@ use crate::util::run_until; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600); -const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570); +const CANCEL_KEY_TTL: Duration = Duration::from_secs(600); +const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570); // Message types for sending through mpsc channel pub enum CancelKeyOp { - StoreCancelKey { + Store { key: CancelKeyData, value: Box, - expire: std::time::Duration, + expire: Duration, }, - GetCancelData { + Refresh { + key: CancelKeyData, + expire: Duration, + }, + Get { key: CancelKeyData, }, - GetCancelDataOld { + GetOld { key: CancelKeyData, }, } @@ -108,7 +112,7 @@ impl Pipeline { impl CancelKeyOp { fn register(&self, pipe: &mut Pipeline) { match self { - CancelKeyOp::StoreCancelKey { key, value, expire } => { + CancelKeyOp::Store { key, value, expire } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::set_options( &key, @@ -116,11 +120,15 @@ impl CancelKeyOp { SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())), )); } - CancelKeyOp::GetCancelDataOld { key } => { + CancelKeyOp::Refresh { key, expire } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command(Cmd::expire(&key, expire.as_secs() as i64)); + } + CancelKeyOp::GetOld { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::hget(key, "data")); } - CancelKeyOp::GetCancelData { key } => { + CancelKeyOp::Get { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::get(key)); } @@ -264,7 +272,7 @@ impl CancellationHandler { .proxy .cancel_channel_size .guard(RedisMsgKind::Get); - let op = CancelKeyOp::GetCancelData { key }; + let op = CancelKeyOp::Get { key }; let result = timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -289,7 +297,7 @@ impl CancellationHandler { .proxy .cancel_channel_size .guard(RedisMsgKind::HGet); - let op = CancelKeyOp::GetCancelDataOld { key }; + let op = CancelKeyOp::GetOld { key }; timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -474,45 +482,95 @@ impl Session { let mut cancel = pin!(cancel); + enum State { + Set, + Refresh, + } + let mut state = State::Set; + loop { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::Set); - let op = CancelKeyOp::StoreCancelKey { - key: self.key, - value: closure_json.clone(), - expire: CANCEL_KEY_TTL, + let guard_op = match state { + State::Set => { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Set); + let op = CancelKeyOp::Store { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_TTL, + }; + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "registering cancellation key" + ); + (guard, op) + } + + State::Refresh => { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Expire); + let op = CancelKeyOp::Refresh { + key: self.key, + expire: CANCEL_KEY_TTL, + }; + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "refreshing cancellation key" + ); + (guard, op) + } }; - tracing::debug!( - src=%self.key, - dest=?cancel_closure.cancel_token, - "registering cancellation key" - ); - - match tx.call((guard, op), cancel.as_mut()).await { - Ok(_) => { + match tx.call(guard_op, cancel.as_mut()).await { + // SET returns OK + Ok(Value::Okay) => { tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, "registered cancellation key" ); - - // wait before continuing. break immediately if cancelled. - if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) - .await - .is_err() - { - break; - } + state = State::Refresh; } + + // EXPIRE returns 1 + Ok(Value::Int(1)) => { + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "refreshed cancellation key" + ); + } + + Ok(_) => { + // Any other response likely means the key expired. + tracing::warn!(src=%self.key, "refreshing cancellation key failed"); + // Re-enter the SET loop to repush full data. + state = State::Set; + } + // retry immediately. Err(BatchQueueError::Result(error)) => { - tracing::warn!(?error, "error registering cancellation key"); + tracing::warn!(?error, "error refreshing cancellation key"); + // Small delay to prevent busy loop with high cpu and logging. + tokio::time::sleep(Duration::from_millis(10)).await; + continue; } + Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, } + + // wait before continuing. break immediately if cancelled. + if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + .await + .is_err() + { + break; + } } if let Err(err) = cancel_closure diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 8439082498..bf4d5a11eb 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -376,6 +376,7 @@ pub enum Waiting { pub enum RedisMsgKind { Set, Get, + Expire, HGet, } From fecb707b19f6f14942e9cbc624890a0e371bb931 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 11:41:58 +0200 Subject: [PATCH 03/11] pagebench: add `idle-streams` (#12583) ## Problem For the communicator scheduling policy, we need to understand the server-side cost of idle gRPC streams. Touches #11735. ## Summary of changes Add an `idle-streams` benchmark to `pagebench` which opens a large number of idle gRPC GetPage streams. --- pageserver/pagebench/src/cmd/idle_streams.rs | 127 +++++++++++++++++++ pageserver/pagebench/src/main.rs | 3 + 2 files changed, 130 insertions(+) create mode 100644 pageserver/pagebench/src/cmd/idle_streams.rs diff --git a/pageserver/pagebench/src/cmd/idle_streams.rs b/pageserver/pagebench/src/cmd/idle_streams.rs new file mode 100644 index 0000000000..73bc9f3f46 --- /dev/null +++ b/pageserver/pagebench/src/cmd/idle_streams.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use futures::StreamExt; +use tonic::transport::Endpoint; +use tracing::info; + +use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag}; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; +use utils::shard::ShardIndex; + +/// Starts a large number of idle gRPC GetPage streams. +#[derive(clap::Parser)] +pub(crate) struct Args { + /// The Pageserver to connect to. Must use grpc://. + #[clap(long, default_value = "grpc://localhost:51051")] + server: String, + /// The Pageserver HTTP API. + #[clap(long, default_value = "http://localhost:9898")] + http_server: String, + /// The number of streams to open. + #[clap(long, default_value = "100000")] + count: usize, + /// Number of streams per connection. + #[clap(long, default_value = "100")] + per_connection: usize, + /// Send a single GetPage request on each stream. + #[clap(long, default_value_t = false)] + send_request: bool, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + rt.block_on(main_impl(args)) +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + // Discover a tenant and timeline to use. + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + reqwest::Client::new(), + args.http_server.clone(), + None, + )); + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: Some(1), + targets: None, + }, + ) + .await?; + let ttid = timelines + .first() + .ok_or_else(|| anyhow!("no timelines found"))?; + + // Set up the initial client. + let endpoint = Endpoint::from_shared(args.server.clone())?; + + let connect = async || { + pageserver_page_api::Client::new( + endpoint.connect().await?, + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + None, + ) + }; + + let mut client = connect().await?; + let mut streams = Vec::with_capacity(args.count); + + // Create streams. + for i in 0..args.count { + if i % 100 == 0 { + info!("opened {}/{} streams", i, args.count); + } + if i % args.per_connection == 0 && i > 0 { + client = connect().await?; + } + + let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel(); + let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); + let mut resp_stream = client.get_pages(req_stream).await?; + + // Send request if specified. + if args.send_request { + req_tx.send(GetPageRequest { + request_id: 1.into(), + request_class: GetPageClass::Normal, + read_lsn: ReadLsn { + request_lsn: Lsn::MAX, + not_modified_since_lsn: Some(Lsn(1)), + }, + rel: RelTag { + spcnode: 1664, // pg_global + dbnode: 0, // shared database + relnode: 1262, // pg_authid + forknum: 0, // init + }, + block_numbers: vec![0], + })?; + let resp = resp_stream + .next() + .await + .transpose()? + .ok_or_else(|| anyhow!("no response"))?; + if resp.status_code != GetPageStatusCode::Ok { + return Err(anyhow!("{} response", resp.status_code)); + } + } + + // Hold onto streams to avoid closing them. + streams.push((req_tx, resp_stream)); + } + + info!("opened {} streams, sleeping", args.count); + + // Block forever, to hold the idle streams open for inspection. + futures::future::pending::<()>().await; + + Ok(()) +} diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 5527557450..6498203de3 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -17,6 +17,7 @@ mod cmd { pub(super) mod aux_files; pub(super) mod basebackup; pub(super) mod getpage_latest_lsn; + pub(super) mod idle_streams; pub(super) mod ondemand_download_churn; pub(super) mod trigger_initial_size_calculation; } @@ -29,6 +30,7 @@ enum Args { TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args), OndemandDownloadChurn(cmd::ondemand_download_churn::Args), AuxFiles(cmd::aux_files::Args), + IdleStreams(cmd::idle_streams::Args), } fn main() { @@ -49,6 +51,7 @@ fn main() { } Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), Args::AuxFiles(args) => cmd::aux_files::main(args), + Args::IdleStreams(args) => cmd::idle_streams::main(args), } .unwrap() } From d14d8271b815b57adeab6707b84ee26909f647f7 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 12:43:10 +0200 Subject: [PATCH 04/11] pageserver/client_grpc: improve retry logic (#12579) ## Problem gRPC client retries currently include pool acquisition under the per-attempt timeout. If pool acquisition is slow (e.g. full pool), this will cause spurious timeout warnings, and the caller will lose its place in the pool queue. Touches #11735. ## Summary of changes Makes several improvements to retries and related logic: * Don't include pool acquisition time under request timeouts. * Move attempt timeouts out of `Retry` and into the closure. * Make `Retry` configurable, move constants into main module. * Don't backoff on the first retry, and reduce initial/max backoffs to 5ms and 5s respectively. * Add `with_retries` and `with_timeout` helpers. * Add slow logging for pool acquisition, and a `warn_slow` counterpart to `log_slow`. * Add debug logging for requests and responses at the client boundary. --- libs/utils/src/logging.rs | 56 +++++++--- pageserver/client_grpc/src/client.rs | 160 +++++++++++++++++++-------- pageserver/client_grpc/src/retry.rs | 72 ++++++------ 3 files changed, 189 insertions(+), 99 deletions(-) diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 5828a400a0..d67c0f123b 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::pin::Pin; use std::str::FromStr; use std::time::Duration; @@ -7,7 +8,7 @@ use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; use tokio::time::Instant; -use tracing::info; +use tracing::{info, warn}; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -377,10 +378,11 @@ impl std::fmt::Debug for SecretString { /// /// TODO: consider upgrading this to a warning, but currently it fires too often. #[inline] -pub async fn log_slow(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O -where - F: Future, -{ +pub async fn log_slow( + name: &str, + threshold: Duration, + f: Pin<&mut impl Future>, +) -> O { monitor_slow_future( threshold, threshold, // period = threshold @@ -394,16 +396,42 @@ where if !is_slow { return; } + let elapsed = elapsed_total.as_secs_f64(); if ready { - info!( - "slow {name} completed after {:.3}s", - elapsed_total.as_secs_f64() - ); + info!("slow {name} completed after {elapsed:.3}s"); } else { - info!( - "slow {name} still running after {:.3}s", - elapsed_total.as_secs_f64() - ); + info!("slow {name} still running after {elapsed:.3}s"); + } + }, + ) + .await +} + +/// Logs a periodic warning if a future is slow to complete. +#[inline] +pub async fn warn_slow( + name: &str, + threshold: Duration, + f: Pin<&mut impl Future>, +) -> O { + monitor_slow_future( + threshold, + threshold, // period = threshold + f, + |MonitorSlowFutureCallback { + ready, + is_slow, + elapsed_total, + elapsed_since_last_callback: _, + }| { + if !is_slow { + return; + } + let elapsed = elapsed_total.as_secs_f64(); + if ready { + warn!("slow {name} completed after {elapsed:.3}s"); + } else { + warn!("slow {name} still running after {elapsed:.3}s"); } }, ) @@ -416,7 +444,7 @@ where pub async fn monitor_slow_future( threshold: Duration, period: Duration, - mut fut: std::pin::Pin<&mut F>, + mut fut: Pin<&mut F>, mut cb: impl FnMut(MonitorSlowFutureCallback), ) -> O where diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 7049fbdb96..7732585f7c 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,13 +1,16 @@ use std::collections::HashMap; use std::num::NonZero; +use std::pin::pin; use std::sync::Arc; +use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; use tonic::codec::CompressionEncoding; -use tracing::instrument; +use tracing::{debug, instrument}; +use utils::logging::warn_slow; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; @@ -44,6 +47,23 @@ const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); /// get a larger queue depth. const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); +/// The overall request call timeout, including retries and pool acquisition. +/// TODO: should we retry forever? Should the caller decide? +const CALL_TIMEOUT: Duration = Duration::from_secs(60); + +/// The per-request (retry attempt) timeout, including any lazy connection establishment. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +/// The initial request retry backoff duration. The first retry does not back off. +/// TODO: use a different backoff for ResourceExhausted (rate limiting)? Needs server support. +const BASE_BACKOFF: Duration = Duration::from_millis(5); + +/// The maximum request retry backoff duration. +const MAX_BACKOFF: Duration = Duration::from_secs(5); + +/// Threshold and interval for warning about slow operation. +const SLOW_THRESHOLD: Duration = Duration::from_secs(3); + /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: /// @@ -67,8 +87,6 @@ pub struct PageserverClient { compression: Option, /// The shards for this tenant. shards: ArcSwap, - /// The retry configuration. - retry: Retry, } impl PageserverClient { @@ -94,7 +112,6 @@ impl PageserverClient { auth_token, compression, shards: ArcSwap::new(Arc::new(shards)), - retry: Retry, }) } @@ -142,13 +159,15 @@ impl PageserverClient { &self, req: page_api::CheckRelExistsRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.check_rel_exists(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.check_rel_exists(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Returns the total size of a database, as # of bytes. @@ -157,13 +176,15 @@ impl PageserverClient { &self, req: page_api::GetDbSizeRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_db_size(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_db_size(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the @@ -193,6 +214,8 @@ impl PageserverClient { return Err(tonic::Status::invalid_argument("request attempt must be 0")); } + debug!("sending request: {req:?}"); + // The shards may change while we're fetching pages. We execute the request using a stable // view of the shards (especially important for requests that span shards), but retry the // top-level (pre-split) request to pick up shard changes. This can lead to unnecessary @@ -201,13 +224,16 @@ impl PageserverClient { // // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this // once we figure out how to handle these. - self.retry - .with(async |attempt| { - let mut req = req.clone(); - req.request_id.attempt = attempt as u32; - Self::get_page_with_shards(req, &self.shards.load_full()).await - }) - .await + let resp = Self::with_retries(CALL_TIMEOUT, async |attempt| { + let mut req = req.clone(); + req.request_id.attempt = attempt as u32; + let shards = self.shards.load_full(); + Self::with_timeout(REQUEST_TIMEOUT, Self::get_page_with_shards(req, &shards)).await + }) + .await?; + + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches pages using the given shards. This uses a stable view of the shards, regardless of @@ -290,13 +316,15 @@ impl PageserverClient { &self, req: page_api::GetRelSizeRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // Relation metadata is only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_rel_size(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // Relation metadata is only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_rel_size(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) } /// Fetches an SLRU segment. @@ -305,13 +333,45 @@ impl PageserverClient { &self, req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { - self.retry - .with(async |_| { - // SLRU segments are only available on shard 0. - let mut client = self.shards.load_full().get_zero().client().await?; - client.get_slru_segment(req).await - }) - .await + debug!("sending request: {req:?}"); + let resp = Self::with_retries(CALL_TIMEOUT, async |_| { + // SLRU segments are only available on shard 0. + let mut client = self.shards.load_full().get_zero().client().await?; + Self::with_timeout(REQUEST_TIMEOUT, client.get_slru_segment(req)).await + }) + .await?; + debug!("received response: {resp:?}"); + Ok(resp) + } + + /// Runs the given async closure with retries up to the given timeout. Only certain gRPC status + /// codes are retried, see [`Retry::should_retry`]. Returns `DeadlineExceeded` on timeout. + async fn with_retries(timeout: Duration, f: F) -> tonic::Result + where + F: FnMut(usize) -> O, // pass attempt number, starting at 0 + O: Future>, + { + Retry { + timeout: Some(timeout), + base_backoff: BASE_BACKOFF, + max_backoff: MAX_BACKOFF, + } + .with(f) + .await + } + + /// Runs the given future with a timeout. Returns `DeadlineExceeded` on timeout. + async fn with_timeout( + timeout: Duration, + f: impl Future>, + ) -> tonic::Result { + let started = Instant::now(); + tokio::time::timeout(timeout, f).await.map_err(|_| { + tonic::Status::deadline_exceeded(format!( + "request timed out after {:.3}s", + started.elapsed().as_secs_f64() + )) + })? } } @@ -525,19 +585,25 @@ impl Shard { } /// Returns a pooled client for this shard. + #[instrument(skip_all)] async fn client(&self) -> tonic::Result { - self.client_pool - .get() - .await - .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) + warn_slow( + "client pool acquisition", + SLOW_THRESHOLD, + pin!(self.client_pool.get()), + ) + .await + .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream /// pool (e.g. for prefetches). + #[instrument(skip_all, fields(bulk))] async fn stream(&self, bulk: bool) -> StreamGuard { - match bulk { - false => self.stream_pool.get().await, - true => self.bulk_stream_pool.get().await, - } + let pool = match bulk { + false => &self.stream_pool, + true => &self.bulk_stream_pool, + }; + warn_slow("stream pool acquisition", SLOW_THRESHOLD, pin!(pool.get())).await } } diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs index a1e0b8636f..8a138711e8 100644 --- a/pageserver/client_grpc/src/retry.rs +++ b/pageserver/client_grpc/src/retry.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use futures::future::pending; use tokio::time::Instant; use tracing::{error, info, warn}; @@ -8,60 +9,54 @@ use utils::backoff::exponential_backoff_duration; /// A retry handler for Pageserver gRPC requests. /// /// This is used instead of backoff::retry for better control and observability. -pub struct Retry; +pub struct Retry { + /// Timeout across all retry attempts. If None, retries forever. + pub timeout: Option, + /// The initial backoff duration. The first retry does not use a backoff. + pub base_backoff: Duration, + /// The maximum backoff duration. + pub max_backoff: Duration, +} impl Retry { - /// The per-request timeout. - // TODO: tune these, and/or make them configurable. Should we retry forever? - const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); - /// The total timeout across all attempts - const TOTAL_TIMEOUT: Duration = Duration::from_secs(60); - /// The initial backoff duration. - const BASE_BACKOFF: Duration = Duration::from_millis(10); - /// The maximum backoff duration. - const MAX_BACKOFF: Duration = Duration::from_secs(10); - /// If true, log successful requests. For debugging. - const LOG_SUCCESS: bool = false; - - /// Runs the given async closure with timeouts and retries (exponential backoff), passing the - /// attempt number starting at 0. Logs errors, using the current tracing span for context. + /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, + /// using the current tracing span for context. /// - /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default - /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. + /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. pub async fn with(&self, mut f: F) -> tonic::Result where - F: FnMut(usize) -> O, // takes attempt number, starting at 0 + F: FnMut(usize) -> O, // pass attempt number, starting at 0 O: Future>, { let started = Instant::now(); - let deadline = started + Self::TOTAL_TIMEOUT; + let deadline = self.timeout.map(|timeout| started + timeout); let mut last_error = None; let mut retries = 0; loop { - // Set up a future to wait for the backoff (if any) and run the request with a timeout. + // Set up a future to wait for the backoff, if any, and run the closure. let backoff_and_try = async { // NB: sleep() always sleeps 1ms, even when given a 0 argument. See: // https://github.com/tokio-rs/tokio/issues/6866 - if let Some(backoff) = Self::backoff_duration(retries) { + if let Some(backoff) = self.backoff_duration(retries) { tokio::time::sleep(backoff).await; } - let request_started = Instant::now(); - tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries)) - .await - .map_err(|_| { - tonic::Status::deadline_exceeded(format!( - "request timed out after {:.3}s", - request_started.elapsed().as_secs_f64() - )) - })? + f(retries).await }; - // Wait for the backoff and request, or bail out if the total timeout is exceeded. + // Set up a future for the timeout, if any. + let timeout = async { + match deadline { + Some(deadline) => tokio::time::sleep_until(deadline).await, + None => pending().await, + } + }; + + // Wait for the backoff and request, or bail out if the timeout is exceeded. let result = tokio::select! { result = backoff_and_try => result, - _ = tokio::time::sleep_until(deadline) => { + _ = timeout => { let last_error = last_error.unwrap_or_else(|| { tonic::Status::deadline_exceeded(format!( "request timed out after {:.3}s", @@ -79,7 +74,7 @@ impl Retry { match result { // Success, return the result. Ok(result) => { - if retries > 0 || Self::LOG_SUCCESS { + if retries > 0 { info!( "request succeeded after {retries} retries in {:.3}s", started.elapsed().as_secs_f64(), @@ -112,12 +107,13 @@ impl Retry { } } - /// Returns the backoff duration for the given retry attempt, or None for no backoff. - fn backoff_duration(retry: usize) -> Option { + /// Returns the backoff duration for the given retry attempt, or None for no backoff. The first + /// attempt and first retry never backs off, so this returns None for 0 and 1 retries. + fn backoff_duration(&self, retries: usize) -> Option { let backoff = exponential_backoff_duration( - retry as u32, - Self::BASE_BACKOFF.as_secs_f64(), - Self::MAX_BACKOFF.as_secs_f64(), + (retries as u32).saturating_sub(1), // first retry does not back off + self.base_backoff.as_secs_f64(), + self.max_backoff.as_secs_f64(), ); (!backoff.is_zero()).then_some(backoff) } From f18cc808f09adcc5fd570cdb2a5bddd2c77a0da9 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 12:47:26 +0200 Subject: [PATCH 05/11] pageserver/client_grpc: reap idle channels immediately (#12587) ## Problem It can take 3x the idle timeout to reap a channel. We have to wait for the idle timeout to trigger first for the stream, then the client, then the channel. Touches #11735. ## Summary of changes Reap empty channels immediately, and rely indirectly on the channel/stream timeouts. This can still lead to 2x the idle timeout for streams (first stream then client), but that's okay -- if the stream closes abruptly (e.g. due to timeout or error) we want to keep the client around in the pool for a while. --- pageserver/client_grpc/src/pool.rs | 66 +++++++++--------------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 906872e091..4a29252cd9 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -9,19 +9,20 @@ //! //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients //! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a -//! per-channel client limit. Channels may be closed when they are no longer used by any clients. +//! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on +//! client/stream idle timeouts. //! //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared) //! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a -//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed -//! from the pool after some time, to free up the channel. +//! single caller at a time, and is returned to the pool when dropped. Idle clients are removed +//! from the pool after a while to free up resources. //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the //! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it //! returns a guard that can be used to send a single request, to properly enforce queue depth and //! route responses. Internally, the pool will reuse or spin up a suitable stream for the request, //! possibly pipelining multiple requests from multiple callers on the same stream (up to some -//! queue depth). Idle streams may be removed from the pool after a while to free up the client. +//! queue depth). Idle streams are removed from the pool after a while to free up resources. //! //! Each channel corresponds to one TCP connection. Each client unary request and each stream //! corresponds to one HTTP/2 stream and server task. @@ -48,14 +49,12 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Reap channels/clients/streams that have been idle for this long. +/// Reap clients/streams that have been idle for this long. Channels are reaped immediately when +/// empty, and indirectly rely on the client/stream idle timeouts. /// -/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to -/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle. -/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we -/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to -/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty -/// channels, and/or stream pool clients. +/// A stream's client will be reaped after 2x the idle threshold (first stream the client), but +/// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to +/// keep its client around in the pool for a while. const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) { false => Duration::from_secs(180), true => Duration::from_secs(1), // exercise reaping in tests @@ -83,8 +82,6 @@ pub struct ChannelPool { max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, - /// Reaps idle channels. - idle_reaper: Reaper, /// Channel ID generator. next_channel_id: AtomicUsize, } @@ -96,9 +93,6 @@ struct ChannelEntry { channel: Channel, /// Number of clients using this channel. clients: usize, - /// The channel has been idle (no clients) since this time. None if channel is in use. - /// INVARIANT: Some if clients == 0, otherwise None. - idle_since: Option, } impl ChannelPool { @@ -108,15 +102,12 @@ impl ChannelPool { E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { - let pool = Arc::new(Self { + Ok(Arc::new(Self { endpoint: endpoint.try_into()?, max_clients_per_channel, channels: Mutex::default(), - idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_channel_id: AtomicUsize::default(), - }); - pool.idle_reaper.spawn(&pool); - Ok(pool) + })) } /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel. @@ -137,22 +128,17 @@ impl ChannelPool { let mut channels = self.channels.lock().unwrap(); // Try to find an existing channel with available capacity. We check entries in BTreeMap - // order, to fill up the lower-ordered channels first. The ClientPool also prefers clients - // with lower-ordered channel IDs first. This will cluster clients in lower-ordered + // order, to fill up the lower-ordered channels first. The client/stream pools also prefer + // clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { assert!( entry.clients <= self.max_clients_per_channel.get(), "channel overflow" ); - assert_eq!( - entry.idle_since.is_some(), - entry.clients == 0, - "incorrect channel idle state" - ); + assert_ne!(entry.clients, 0, "empty channel not reaped"); if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; - entry.idle_since = None; return ChannelGuard { pool: Arc::downgrade(self), id, @@ -169,7 +155,6 @@ impl ChannelPool { let entry = ChannelEntry { channel: channel.clone(), clients: 1, // account for the guard below - idle_since: None, }; channels.insert(id, entry); @@ -181,20 +166,6 @@ impl ChannelPool { } } -impl Reapable for ChannelPool { - /// Reaps channels that have been idle since before the cutoff. - fn reap_idle(&self, cutoff: Instant) { - self.channels.lock().unwrap().retain(|_, entry| { - let Some(idle_since) = entry.idle_since else { - assert_ne!(entry.clients, 0, "empty channel not marked idle"); - return true; - }; - assert_eq!(entry.clients, 0, "idle channel has clients"); - idle_since >= cutoff - }) - } -} - /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`, /// since the gRPC client requires an owned `Channel`. pub struct ChannelGuard { @@ -211,7 +182,7 @@ impl ChannelGuard { } } -/// Returns the channel to the pool. +/// Returns the channel to the pool. The channel is closed when empty. impl Drop for ChannelGuard { fn drop(&mut self) { let Some(pool) = self.pool.upgrade() else { @@ -220,11 +191,12 @@ impl Drop for ChannelGuard { let mut channels = pool.channels.lock().unwrap(); let entry = channels.get_mut(&self.id).expect("unknown channel"); - assert!(entry.idle_since.is_none(), "active channel marked idle"); assert!(entry.clients > 0, "channel underflow"); entry.clients -= 1; + + // Reap empty channels immediately. if entry.clients == 0 { - entry.idle_since = Some(Instant::now()); // mark channel as idle + channels.remove(&self.id); } } } From 30b877074cda2580c677ec9527b83ab975dee181 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 13:44:53 +0200 Subject: [PATCH 06/11] pagebench: add CPU profiling support (#12478) ## Problem The new communicator gRPC client has significantly worse Pagebench performance than a basic gRPC client. We need to find out why. ## Summary of changes Add a `pagebench --profile` flag which takes a client CPU profile of the benchmark and writes a flamegraph to `profile.svg`. --- Cargo.lock | 1 + pageserver/pagebench/Cargo.toml | 1 + pageserver/pagebench/src/main.rs | 59 +++++++++++++++++++++++++------- 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14b460005a..bea8d3a7fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4296,6 +4296,7 @@ dependencies = [ "pageserver_client", "pageserver_client_grpc", "pageserver_page_api", + "pprof", "rand 0.8.5", "reqwest", "serde", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 4086213830..609fef2b4f 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -16,6 +16,7 @@ futures.workspace = true hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true +pprof.workspace = true rand.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 6498203de3..ceca58e032 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -1,4 +1,7 @@ +use std::fs::File; + use clap::Parser; +use tracing::info; use utils::logging; /// Re-usable pieces of code that aren't CLI-specific. @@ -24,7 +27,18 @@ mod cmd { /// Component-level performance test for pageserver. #[derive(clap::Parser)] -enum Args { +struct Args { + /// Takes a client CPU profile into profile.svg. The benchmark must exit cleanly before it's + /// written, e.g. via --runtime. + #[arg(long)] + profile: bool, + + #[command(subcommand)] + subcommand: Subcommand, +} + +#[derive(clap::Subcommand)] +enum Subcommand { Basebackup(cmd::basebackup::Args), GetPageLatestLsn(cmd::getpage_latest_lsn::Args), TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args), @@ -33,25 +47,46 @@ enum Args { IdleStreams(cmd::idle_streams::Args), } -fn main() { +fn main() -> anyhow::Result<()> { logging::init( logging::LogFormat::Plain, logging::TracingErrorLayerEnablement::Disabled, logging::Output::Stderr, - ) - .unwrap(); + )?; logging::replace_panic_hook_with_tracing_panic_hook().forget(); let args = Args::parse(); - match args { - Args::Basebackup(args) => cmd::basebackup::main(args), - Args::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args), - Args::TriggerInitialSizeCalculation(args) => { + + // Start a CPU profile if requested. + let mut profiler = None; + if args.profile { + profiler = Some( + pprof::ProfilerGuardBuilder::default() + .frequency(1000) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build()?, + ); + } + + match args.subcommand { + Subcommand::Basebackup(args) => cmd::basebackup::main(args), + Subcommand::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args), + Subcommand::TriggerInitialSizeCalculation(args) => { cmd::trigger_initial_size_calculation::main(args) } - Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), - Args::AuxFiles(args) => cmd::aux_files::main(args), - Args::IdleStreams(args) => cmd::idle_streams::main(args), + Subcommand::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), + Subcommand::AuxFiles(args) => cmd::aux_files::main(args), + Subcommand::IdleStreams(args) => cmd::idle_streams::main(args), + }?; + + // Generate a CPU flamegraph if requested. + if let Some(profiler) = profiler { + let report = profiler.report().build()?; + drop(profiler); // stop profiling + let file = File::create("profile.svg")?; + report.flamegraph(file)?; + info!("wrote CPU profile flamegraph to profile.svg") } - .unwrap() + + Ok(()) } From 42ab34dc362b1b54dc96c43202b43d5ece558aa7 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 14:11:33 +0200 Subject: [PATCH 07/11] pageserver/client_grpc: don't pipeline GetPage requests (#12584) ## Problem The communicator gRPC client currently attempts to pipeline GetPage requests from multiple callers onto the same gRPC stream. This has a number of issues: * Head-of-line blocking: the request may block on e.g. layer download or LSN wait, delaying the next request. * Cancellation: we can't easily cancel in-progress requests (e.g. due to timeout or backend termination), so it may keep blocking the next request (even its own retry). * Complex stream scheduling: picking a stream becomes harder/slower, and additional Tokio tasks and synchronization is needed for stream management. Touches #11735. Requires #12579. ## Summary of changes This patch removes pipelining of gRPC stream requests, and instead prefers to scale out the number of streams to achieve the same throughput. Stream scheduling has been rewritten, and mostly follows the same pattern as the client pool with exclusive acquisition by a single caller. [Benchmarks](https://github.com/neondatabase/neon/pull/12583) show that the cost of an idle server-side GetPage worker task is about 26 KB (2.5 GB for 100,000), so we can afford to scale out. This has a number of advantages: * It (mostly) eliminates head-of-line blocking (except at the TCP level). * Cancellation becomes trivial, by closing the stream. * Stream scheduling becomes significantly simpler and cheaper. * Individual callers can still use client-side batching for pipelining. --- Cargo.lock | 1 + Cargo.toml | 2 +- pageserver/client_grpc/src/client.rs | 19 +- pageserver/client_grpc/src/pool.rs | 397 +++++++++++---------------- pageserver/page_api/src/model.rs | 7 +- workspace_hack/Cargo.toml | 2 +- 6 files changed, 165 insertions(+), 263 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bea8d3a7fd..2f36790d30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7564,6 +7564,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0d521ee4d9..df2064a4a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,7 +201,7 @@ tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.g tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"] } tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] } toml = "0.8" diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 7732585f7c..4b606d6939 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -32,21 +32,13 @@ const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); /// Max number of concurrent unary request clients per shard. const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); -/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage -/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. +/// Max number of concurrent GetPage streams per shard. const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); -/// Max number of pipelined requests per stream. -const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); - /// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these -/// are more throughput-oriented, we have a smaller limit but higher queue depth. +/// are more throughput-oriented, we have a smaller limit. const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); -/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus -/// get a larger queue depth. -const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); - /// The overall request call timeout, including retries and pool acquisition. /// TODO: should we retry forever? Should the caller decide? const CALL_TIMEOUT: Duration = Duration::from_secs(60); @@ -272,7 +264,7 @@ impl PageserverClient { req: page_api::GetPageRequest, shard: &Shard, ) -> tonic::Result { - let stream = shard.stream(req.request_class.is_bulk()).await; + let mut stream = shard.stream(req.request_class.is_bulk()).await?; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -557,7 +549,6 @@ impl Shard { None, // unbounded, limited by stream pool ), Some(MAX_STREAMS), - MAX_STREAM_QUEUE_DEPTH, ); // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools @@ -573,7 +564,6 @@ impl Shard { None, // unbounded, limited by stream pool ), Some(MAX_BULK_STREAMS), - MAX_BULK_STREAM_QUEUE_DEPTH, ); Ok(Self { @@ -593,13 +583,12 @@ impl Shard { pin!(self.client_pool.get()), ) .await - .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream /// pool (e.g. for prefetches). #[instrument(skip_all, fields(bulk))] - async fn stream(&self, bulk: bool) -> StreamGuard { + async fn stream(&self, bulk: bool) -> tonic::Result { let pool = match bulk { false => &self.stream_pool, true => &self.bulk_stream_pool, diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 4a29252cd9..98a649b4c8 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -18,11 +18,27 @@ //! from the pool after a while to free up resources. //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the -//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it -//! returns a guard that can be used to send a single request, to properly enforce queue depth and -//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request, -//! possibly pipelining multiple requests from multiple callers on the same stream (up to some -//! queue depth). Idle streams are removed from the pool after a while to free up resources. +//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a +//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after +//! a while to free up resources. +//! +//! The stream only supports sending a single, synchronous request at a time, and does not support +//! pipelining multiple requests from different callers onto the same stream -- instead, we scale +//! out concurrent streams to improve throughput. There are many reasons for this design choice: +//! +//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by +//! a single server task, which may block e.g. on layer downloads, LSN waits, etc. +//! +//! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away +//! (e.g. because of a timeout), the request would still be processed by the server and block +//! requests behind it in the stream. It might even block its own timeout retry. +//! +//! * Stream scheduling becomes significantly simpler and cheaper. +//! +//! * Individual callers can still use client-side batching for pipelining. +//! +//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB +//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out. //! //! Each channel corresponds to one TCP connection. Each client unary request and each stream //! corresponds to one HTTP/2 stream and server task. @@ -30,20 +46,20 @@ //! TODO: error handling (including custom error types). //! TODO: observability. -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::num::NonZero; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::{Duration, Instant}; -use futures::StreamExt as _; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; +use futures::{Stream, StreamExt as _}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch}; +use tokio_stream::wrappers::WatchStream; use tokio_util::sync::CancellationToken; use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint}; -use tracing::{error, warn}; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; @@ -225,8 +241,7 @@ pub struct ClientPool { /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from - /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle - /// clients are reaped. + /// lower-ordered channels. This allows us to free up and reap higher-ordered channels. idle: Mutex>, /// Reaps idle clients. idle_reaper: Reaper, @@ -282,7 +297,7 @@ impl ClientPool { /// This is moderately performance-sensitive. It is called for every unary request, but these /// establish a new gRPC stream per request so they're already expensive. GetPage requests use /// the `StreamPool` instead. - pub async fn get(self: &Arc) -> anyhow::Result { + pub async fn get(self: &Arc) -> tonic::Result { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { @@ -300,7 +315,7 @@ impl ClientPool { }); } - // Slow path: construct a new client. + // Construct a new client. let mut channel_guard = self.channel_pool.get(); let client = page_api::Client::new( channel_guard.take(), @@ -309,7 +324,8 @@ impl ClientPool { self.shard_id, self.auth_token.clone(), self.compression, - )?; + ) + .map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?; Ok(ClientGuard { pool: Arc::downgrade(self), @@ -379,287 +395,187 @@ impl Drop for ClientGuard { /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream /// acquires a client from the inner `ClientPool` for the stream's lifetime. /// -/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send -/// a single request and await the response. Internally, requests are multiplexed across streams and -/// channels. This allows proper queue depth enforcement and response routing. +/// Individual streams only send a single request at a time, and do not pipeline multiple callers +/// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily +/// to eliminate head-of-line blocking. See the module documentation for more details. /// /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, - /// All pooled streams. + /// Idle pooled streams. Acquired streams are removed from here and returned on drop. /// - /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each - /// stream has an associated Tokio task that processes requests and responses. - streams: Mutex>, - /// The max number of concurrent streams, or None if unbounded. - max_streams: Option>, - /// The max number of concurrent requests per stream. - max_queue_depth: NonZero, - /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. - /// None if the pool is unbounded. + /// The first stream in the map will be acquired next. The map is sorted by stream ID, which is + /// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer + /// acquiring idle streams from lower-ordered channels, which allows us to free up and reap + /// higher-ordered channels. + idle: Mutex>, + /// Limits the max number of concurrent streams. None if the pool is unbounded. limiter: Option>, /// Reaps idle streams. idle_reaper: Reaper, - /// Stream ID generator. - next_stream_id: AtomicUsize, } -type StreamID = usize; -type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>; -type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>; -type ResponseSender = oneshot::Sender>; +/// The stream ID. Reuses the inner client ID. +type StreamID = ClientID; +/// A pooled stream. struct StreamEntry { - /// Sends caller requests to the stream task. The stream task exits when this is dropped. - sender: RequestSender, - /// Number of in-flight requests on this stream. - queue_depth: usize, - /// The time when this stream went idle (queue_depth == 0). - /// INVARIANT: Some if queue_depth == 0, otherwise None. - idle_since: Option, + /// The bidirectional stream. + stream: BiStream, + /// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`. + idle_since: Instant, +} + +/// A bidirectional GetPage stream and its client. Can send requests and receive responses. +struct BiStream { + /// The owning client. Holds onto the channel slot while the stream is alive. + client: ClientGuard, + /// Stream for sending requests. Uses a watch channel, so it can only send a single request at a + /// time, and the caller must await the response before sending another request. This is + /// enforced by `StreamGuard::send`. + sender: watch::Sender, + /// Stream for receiving responses. + receiver: Pin> + Send>>, } impl StreamPool { - /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` - /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. + /// Creates a new stream pool, using the given client pool. It will use up to `max_streams` + /// concurrent streams. /// /// The client pool must be unbounded. The stream pool will enforce its own limits, and because /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. /// The stream pool should generally have its own dedicated client pool (but it can share a /// channel pool with others since these are always unbounded). - pub fn new( - client_pool: Arc, - max_streams: Option>, - max_queue_depth: NonZero, - ) -> Arc { + pub fn new(client_pool: Arc, max_streams: Option>) -> Arc { assert!(client_pool.limiter.is_none(), "bounded client pool"); let pool = Arc::new(Self { client_pool, - streams: Mutex::default(), - limiter: max_streams.map(|max_streams| { - Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) - }), - max_streams, - max_queue_depth, + idle: Mutex::default(), + limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))), idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), - next_stream_id: AtomicUsize::default(), }); pool.idle_reaper.spawn(&pool); pool } - /// Acquires an available stream from the pool, or spins up a new stream async if all streams - /// are full. Returns a guard that can be used to send a single request on the stream and await - /// the response, with queue depth quota already acquired. Blocks if the pool is at capacity - /// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight). + /// Acquires an available stream from the pool, or spins up a new stream if all streams are + /// full. Returns a guard that can be used to send requests and await the responses. Blocks if + /// the pool is full. /// /// This is very performance-sensitive, as it is on the GetPage hot path. /// - /// TODO: this must do something more sophisticated for performance. We want: - /// - /// * Cheap, concurrent access in the common case where we can use a pooled stream. - /// * Quick acquisition of pooled streams with available capacity. - /// * Prefer streams that belong to lower-numbered channels, to reap idle channels. - /// * Prefer filling up existing streams' queue depth before spinning up new streams. - /// * Don't hold a lock while spinning up new streams. - /// * Allow concurrent clients to join onto streams while they're spun up. - /// * Allow spinning up multiple streams concurrently, but don't overshoot limits. - /// - /// For now, we just do something simple but inefficient (linear scan under mutex). - pub async fn get(self: &Arc) -> StreamGuard { + /// TODO: is a `Mutex` performant enough? Will it become too contended? We can't + /// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first + /// to free up higher-ordered channels. + pub async fn get(self: &Arc) -> tonic::Result { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { permit = Some(limiter.acquire_owned().await.expect("never closed")); } - let mut streams = self.streams.lock().unwrap(); - // Look for a pooled stream with available capacity. - for (&id, entry) in streams.iter_mut() { - assert!( - entry.queue_depth <= self.max_queue_depth.get(), - "stream queue overflow" - ); - assert_eq!( - entry.idle_since.is_some(), - entry.queue_depth == 0, - "incorrect stream idle state" - ); - if entry.queue_depth < self.max_queue_depth.get() { - entry.queue_depth += 1; - entry.idle_since = None; - return StreamGuard { - pool: Arc::downgrade(self), - id, - sender: entry.sender.clone(), - permit, - }; - } + // Fast path: acquire an idle stream from the pool. + if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() { + return Ok(StreamGuard { + pool: Arc::downgrade(self), + stream: Some(entry.stream), + can_reuse: true, + permit, + }); } - // No available stream, spin up a new one. We install the stream entry in the pool first and - // return the guard, while spinning up the stream task async. This allows other callers to - // join onto this stream and also create additional streams concurrently if this fills up. - let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); - let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); - let entry = StreamEntry { - sender: req_tx.clone(), - queue_depth: 1, // reserve quota for this caller - idle_since: None, - }; - streams.insert(id, entry); + // Spin up a new stream. Uses a watch channel to send a single request at a time, since + // `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead. + let mut client = self.client_pool.get().await?; - if let Some(max_streams) = self.max_streams { - assert!(streams.len() <= max_streams.get(), "stream overflow"); - }; + let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default()); + let req_stream = WatchStream::from_changes(req_rx); + let resp_stream = client.get_pages(req_stream).await?; - let client_pool = self.client_pool.clone(); - let pool = Arc::downgrade(self); - - tokio::spawn(async move { - if let Err(err) = Self::run_stream(client_pool, req_rx).await { - error!("stream failed: {err}"); - } - // Remove stream from pool on exit. Weak reference to avoid holding the pool alive. - if let Some(pool) = pool.upgrade() { - let entry = pool.streams.lock().unwrap().remove(&id); - assert!(entry.is_some(), "unknown stream ID: {id}"); - } - }); - - StreamGuard { + Ok(StreamGuard { pool: Arc::downgrade(self), - id, - sender: req_tx, + stream: Some(BiStream { + client, + sender: req_tx, + receiver: Box::pin(resp_stream), + }), + can_reuse: true, permit, - } - } - - /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a - /// bidirectional GetPage stream, then forwards requests and responses between callers and the - /// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be - /// atomic with pool stream acquisition. - /// - /// The task exits when the request channel is closed, or on a stream error. The caller is - /// responsible for removing the stream from the pool on exit. - async fn run_stream( - client_pool: Arc, - mut caller_rx: RequestReceiver, - ) -> anyhow::Result<()> { - // Acquire a client from the pool and create a stream. - let mut client = client_pool.get().await?; - - // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could - // theoretically deadlock if both the client and server block on sends (since we're not - // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and - // low queue depths, but it was seen to happen with the libpq protocol so better safe than - // sorry. It should never buffer more than the queue depth anyway, but using an unbounded - // channel guarantees that it will never block. - let (req_tx, req_rx) = mpsc::unbounded_channel(); - let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); - let mut resp_stream = client.get_pages(req_stream).await?; - - // Track caller response channels by request ID. If the task returns early, these response - // channels will be dropped and the waiting callers will receive an error. - // - // NB: this will leak entries if the server doesn't respond to a request (by request ID). - // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and - // block further use. But we could consider reaping closed channels after some time. - let mut callers = HashMap::new(); - - // Process requests and responses. - loop { - tokio::select! { - // Receive requests from callers and send them to the stream. - req = caller_rx.recv() => { - // Shut down if request channel is closed. - let Some((req, resp_tx)) = req else { - return Ok(()); - }; - - // Store the response channel by request ID. - if callers.contains_key(&req.request_id) { - // Error on request ID duplicates. Ignore callers that went away. - _ = resp_tx.send(Err(tonic::Status::invalid_argument( - format!("duplicate request ID: {}", req.request_id), - ))); - continue; - } - callers.insert(req.request_id, resp_tx); - - // Send the request on the stream. Bail out if the stream is closed. - req_tx.send(req).map_err(|_| { - tonic::Status::unavailable("stream closed") - })?; - } - - // Receive responses from the stream and send them to callers. - resp = resp_stream.next() => { - // Shut down if the stream is closed, and bail out on stream errors. - let Some(resp) = resp.transpose()? else { - return Ok(()) - }; - - // Send the response to the caller. Ignore errors if the caller went away. - let Some(resp_tx) = callers.remove(&resp.request_id) else { - warn!("received response for unknown request ID: {}", resp.request_id); - continue; - }; - _ = resp_tx.send(Ok(resp)); - } - } - } + }) } } impl Reapable for StreamPool { /// Reaps streams that have been idle since before the cutoff. fn reap_idle(&self, cutoff: Instant) { - self.streams.lock().unwrap().retain(|_, entry| { - let Some(idle_since) = entry.idle_since else { - assert_ne!(entry.queue_depth, 0, "empty stream not marked idle"); - return true; - }; - assert_eq!(entry.queue_depth, 0, "idle stream has requests"); - idle_since >= cutoff - }); + self.idle + .lock() + .unwrap() + .retain(|_, entry| entry.idle_since >= cutoff); } } -/// A pooled stream reference. Can be used to send a single request, to properly enforce queue -/// depth. Queue depth is already reserved and will be returned on drop. +/// A stream acquired from the pool. Returned to the pool when dropped, unless there are still +/// in-flight requests on the stream, or the stream failed. pub struct StreamGuard { pool: Weak, - id: StreamID, - sender: RequestSender, + stream: Option, // Some until dropped + can_reuse: bool, // returned to pool if true permit: Option, // None if pool is unbounded } impl StreamGuard { - /// Sends a request on the stream and awaits the response. Consumes the guard, since it's only - /// valid for a single request (to enforce queue depth). This also drops the guard on return and - /// returns the queue depth quota to the pool. + /// Sends a request on the stream and awaits the response. If the future is dropped before it + /// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the + /// request and is not returned to the pool. The same is true if the stream errors, in which + /// case the caller can't send further requests on the stream. /// - /// The `GetPageRequest::request_id` must be unique across in-flight requests. + /// We only support sending a single request at a time, to eliminate head-of-line blocking. See + /// module documentation for details. /// /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status` /// to avoid tearing down the stream for per-request errors. Callers must check this. pub async fn send( - self, + &mut self, req: page_api::GetPageRequest, ) -> tonic::Result { - let (resp_tx, resp_rx) = oneshot::channel(); + let req_id = req.request_id; + let stream = self.stream.as_mut().expect("not dropped"); - self.sender - .send((req, resp_tx)) - .await + // Mark the stream as not reusable while the request is in flight. We can't return the + // stream to the pool until we receive the response, to avoid head-of-line blocking and + // stale responses. Failed streams can't be reused either. + if !self.can_reuse { + return Err(tonic::Status::internal("stream can't be reused")); + } + self.can_reuse = false; + + // Send the request and receive the response. + // + // NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests. + stream + .sender + .send(req) .map_err(|_| tonic::Status::unavailable("stream closed"))?; - resp_rx + let resp = stream + .receiver + .next() .await - .map_err(|_| tonic::Status::unavailable("stream closed"))? + .ok_or_else(|| tonic::Status::unavailable("stream closed"))??; + + if resp.request_id != req_id { + return Err(tonic::Status::internal(format!( + "response ID {} does not match request ID {}", + resp.request_id, req_id + ))); + } + + // Success, mark the stream as reusable. + self.can_reuse = true; + + Ok(resp) } } @@ -669,26 +585,21 @@ impl Drop for StreamGuard { return; // pool was dropped }; - // Release the queue depth reservation on drop. This can prematurely decrement it if dropped - // before the response is received, but that's okay. - // - // TODO: actually, it's probably not okay. Queue depth release should be moved into the - // stream task, such that it continues to account for the queue depth slot until the server - // responds. Otherwise, if a slow request times out and keeps blocking the stream, the - // server will keep waiting on it and we can pile on subsequent requests (including the - // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking - // requests on e.g. LSN waits and layer downloads, instead returning early to free up the - // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line - // blocking. TBD. - let mut streams = pool.streams.lock().unwrap(); - let entry = streams.get_mut(&self.id).expect("unknown stream"); - assert!(entry.idle_since.is_none(), "active stream marked idle"); - assert!(entry.queue_depth > 0, "stream queue underflow"); - entry.queue_depth -= 1; - if entry.queue_depth == 0 { - entry.idle_since = Some(Instant::now()); // mark stream as idle + // If the stream isn't reusable, it can't be returned to the pool. + if !self.can_reuse { + return; } + // Place the idle stream back into the pool. + let entry = StreamEntry { + stream: self.stream.take().expect("dropped once"), + idle_since: Instant::now(), + }; + pool.idle + .lock() + .unwrap() + .insert(entry.stream.client.id, entry); + _ = self.permit; // returned on drop, referenced for visibility } } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index a9dd154285..76355ae546 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -49,7 +49,7 @@ impl From for tonic::Status { } /// The LSN a request should read at. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Default)] pub struct ReadLsn { /// The request's read LSN. pub request_lsn: Lsn, @@ -329,7 +329,7 @@ impl From for proto::GetDbSizeResponse { } /// Requests one or more pages. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct GetPageRequest { /// A request ID. Will be included in the response. Should be unique for in-flight requests on /// the stream. @@ -430,12 +430,13 @@ impl From for proto::RequestId { } /// A GetPage request class. -#[derive(Clone, Copy, Debug, strum_macros::Display)] +#[derive(Clone, Copy, Debug, Default, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. Unknown, /// A normal request. This is the default. + #[default] Normal, /// A prefetch request. NB: can only be classified on pg < 18. Prefetch, diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index fc01deb92d..c61598cdf6 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -98,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["full", "test-util"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { version = "0.1", features = ["net", "sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] } From a203f9829a87fc47deece609b4a35b6239bd7322 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 14:30:28 +0200 Subject: [PATCH 08/11] pageserver: add timeline_id span when freezing layers (#12572) ## Problem We don't log the timeline ID when rolling ephemeral layers during housekeeping. Resolves [LKB-179](https://databricks.atlassian.net/browse/LKB-179) ## Summary of changes Add a span with timeline ID when calling `maybe_freeze_ephemeral_layer` from the housekeeping loop. We don't instrument the function itself, since future callers may not have a span including the tenant_id already, but we don't want to duplicate the tenant_id for these spans. --- pageserver/src/tenant.rs | 8 +++++++- pageserver/src/tenant/timeline.rs | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f75a03a508..1a3016e7f1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3393,7 +3393,13 @@ impl TenantShard { .collect_vec(); for timeline in timelines { - timeline.maybe_freeze_ephemeral_layer().await; + // Include a span with the timeline ID. The parent span already has the tenant ID. + let span = + info_span!("maybe_freeze_ephemeral_layer", timeline_id = %timeline.timeline_id); + timeline + .maybe_freeze_ephemeral_layer() + .instrument(span) + .await; } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fe622713e9..f2833674a9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1893,6 +1893,8 @@ impl Timeline { // an ephemeral layer open forever when idle. It also freezes layers if the global limit on // ephemeral layer bytes has been breached. pub(super) async fn maybe_freeze_ephemeral_layer(&self) { + debug_assert_current_span_has_tenant_and_timeline_id(); + let Ok(mut write_guard) = self.write_lock.try_lock() else { // If the write lock is held, there is an active wal receiver: rolling open layers // is their responsibility while they hold this lock. From eb830fa547f61aaaa582d765b440b156b6a780f2 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 15:22:38 +0200 Subject: [PATCH 09/11] pageserver/client_grpc: use unbounded pools (#12585) ## Problem The communicator gRPC client currently uses bounded client/stream pools. This can artificially constrain clients, especially after we remove pipelining in #12584. [Benchmarks](https://github.com/neondatabase/neon/pull/12583) show that the cost of an idle server-side GetPage worker task is about 26 KB (2.5 GB for 100,000), so we can afford to scale out. In the worst case, we'll degenerate to the current libpq state with one stream per backend, but without the TCP connection overhead. In the common case we expect significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits, read coalescing, sharding (backends typically only talk to one shard at a time), etc. Currently, Pageservers rarely serve more than 4000 backend connections, so we have at least 2 orders of magnitude of headroom. Touches #11735. Requires #12584. ## Summary of changes Remove the pool limits, and restructure the pools. We still keep a separate bulk pool for Getpage batches of >4 pages (>32 KB), with fewer streams per connection. This reduces TCP-level congestion and head-of-line blocking for non-bulk requests, and concentrates larger window sizes on a smaller set of streams/connections, presumably reducing memory usage. Apart from this, bulk requests don't have any latency penalty compared to other requests. --- pageserver/client_grpc/src/client.rs | 104 ++++++++++++++------------- pageserver/page_api/src/model.rs | 13 ---- 2 files changed, 55 insertions(+), 62 deletions(-) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 4b606d6939..3a9edc7092 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -24,20 +24,23 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up /// when full. /// +/// Normal requests are small, and we don't pipeline them, so we can afford a large number of +/// streams per connection. +/// /// TODO: tune all of these constants, and consider making them configurable. -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(64).unwrap(); -/// Max number of concurrent unary request clients per shard. -const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); +/// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a +/// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and +/// transmission delays. This also concentrates large window sizes on a smaller set of +/// streams/connections, presumably reducing memory use. +const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); -/// Max number of concurrent GetPage streams per shard. -const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); - -/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these -/// are more throughput-oriented, we have a smaller limit. -const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); +/// The batch size threshold at which a GetPage request will use the bulk stream pool. +/// +/// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window +/// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool. +const BULK_THRESHOLD_BATCH_SIZE: usize = 5; /// The overall request call timeout, including retries and pool acquisition. /// TODO: should we retry forever? Should the caller decide? @@ -62,10 +65,19 @@ const SLOW_THRESHOLD: Duration = Duration::from_secs(3); /// * Sharded tenants across multiple Pageservers. /// * Pooling of connections, clients, and streams for efficient resource use. /// * Concurrent use by many callers. -/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling. +/// * Internal handling of GetPage bidirectional streams. /// * Automatic retries. /// * Observability. /// +/// The client has dedicated connection/client/stream pools per shard, for resource reuse. These +/// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all +/// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly +/// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000 +/// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with +/// one stream per backend, but without the TCP connection overhead. In the common case we expect +/// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits, +/// read coalescing, sharding (backends typically only talk to one shard at a time), etc. +/// /// TODO: this client does not support base backups or LSN leases, as these are only used by /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { @@ -264,7 +276,7 @@ impl PageserverClient { req: page_api::GetPageRequest, shard: &Shard, ) -> tonic::Result { - let mut stream = shard.stream(req.request_class.is_bulk()).await?; + let mut stream = shard.stream(Self::is_bulk(&req)).await?; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -365,6 +377,11 @@ impl PageserverClient { )) })? } + + /// Returns true if the request is considered a bulk request and should use the bulk pool. + fn is_bulk(req: &page_api::GetPageRequest) -> bool { + req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE + } } /// Shard specification for a PageserverClient. @@ -492,15 +509,23 @@ impl Shards { } } -/// A single shard. Uses dedicated resource pools with the following structure: +/// A single shard. Has dedicated resource pools with the following structure: /// -/// * Channel pool: unbounded. -/// * Unary client pool: MAX_UNARY_CLIENTS. -/// * Stream client pool: unbounded. -/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. -/// * Bulk channel pool: unbounded. +/// * Channel pool: MAX_CLIENTS_PER_CHANNEL. +/// * Client pool: unbounded. +/// * Stream pool: unbounded. +/// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL. /// * Bulk client pool: unbounded. -/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. +/// * Bulk stream pool: unbounded. +/// +/// We use a separate bulk channel pool with a lower concurrency limit for large batch requests. +/// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a +/// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools +/// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly +/// similar (except for TCP transmission time). +/// +/// TODO: since we never use bounded pools, we could consider removing the pool limiters. However, +/// the code is fairly trivial, so we may as well keep them around for now in case we need them. struct Shard { /// The shard ID. id: ShardIndex, @@ -508,7 +533,7 @@ struct Shard { client_pool: Arc, /// GetPage stream pool. stream_pool: Arc, - /// GetPage stream pool for bulk requests, e.g. prefetches. + /// GetPage stream pool for bulk requests. bulk_stream_pool: Arc, } @@ -522,48 +547,30 @@ impl Shard { auth_token: Option, compression: Option, ) -> anyhow::Result { - // Common channel pool for unary and stream requests. Bounded by client/stream pools. - let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - - // Client pool for unary requests. + // Shard pools for unary requests and non-bulk GetPage requests. let client_pool = ClientPool::new( - channel_pool.clone(), + ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?, tenant_id, timeline_id, shard_id, auth_token.clone(), compression, - Some(MAX_UNARY_CLIENTS), + None, // unbounded ); + let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded - // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, - // but shares a channel pool with it (as it's unbounded). - let stream_pool = StreamPool::new( - ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token.clone(), - compression, - None, // unbounded, limited by stream pool - ), - Some(MAX_STREAMS), - ); - - // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools - // to avoid head-of-line blocking of latency-sensitive requests. + // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.). let bulk_stream_pool = StreamPool::new( ClientPool::new( - ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?, tenant_id, timeline_id, shard_id, auth_token, compression, - None, // unbounded, limited by stream pool + None, // unbounded, ), - Some(MAX_BULK_STREAMS), + None, // unbounded ); Ok(Self { @@ -585,8 +592,7 @@ impl Shard { .await } - /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream - /// pool (e.g. for prefetches). + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool. #[instrument(skip_all, fields(bulk))] async fn stream(&self, bulk: bool) -> tonic::Result { let pool = match bulk { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 76355ae546..a3286ecf15 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -444,19 +444,6 @@ pub enum GetPageClass { Background, } -impl GetPageClass { - /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than - /// latency-sensitive). - pub fn is_bulk(&self) -> bool { - match self { - Self::Unknown => false, - Self::Normal => false, - Self::Prefetch => true, - Self::Background => true, - } - } -} - impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb { From 4fedcbc0ac94d399808384911b92f8417b74c286 Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Mon, 14 Jul 2025 15:25:25 +0200 Subject: [PATCH 10/11] Leverage the existing mechanism to retry 404 errors instead of implementing new code. (#12567) ## Problem In https://github.com/neondatabase/neon/pull/12513, the new code was implemented to retry 404 errors caused by the replication lag. However, this implemented the new logic, making the script more complicated, while we have an existing one in `neon_api.py`. ## Summary of changes The existing mechanism is used to retry 404 errors. --------- Co-authored-by: Alexey Masterov --- test_runner/fixtures/neon_api.py | 19 +++++++++++++------ test_runner/random_ops/test_random_ops.py | 22 +++------------------- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index 9d85b9a332..e0f16abe77 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -34,7 +34,9 @@ class NeonAPI: self.retries524 = 0 self.retries4xx = 0 - def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response: + def __request( + self, method: str | bytes, endpoint: str, retry404: bool = False, **kwargs: Any + ) -> requests.Response: kwargs["headers"] = kwargs.get("headers", {}) kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}" @@ -55,10 +57,12 @@ class NeonAPI: resp.raise_for_status() break elif resp.status_code >= 400: - if resp.status_code == 422: - if resp.json()["message"] == "branch not ready yet": - retry = True - self.retries4xx += 1 + if resp.status_code == 404 and retry404: + retry = True + self.retries4xx += 1 + elif resp.status_code == 422 and resp.json()["message"] == "branch not ready yet": + retry = True + self.retries4xx += 1 elif resp.status_code == 423 and resp.json()["message"] in { "endpoint is in some transitive state, could not suspend", "project already has running conflicting operations, scheduling of new ones is prohibited", @@ -66,7 +70,7 @@ class NeonAPI: retry = True self.retries4xx += 1 elif resp.status_code == 524: - log.info("The request was timed out, trying to get operations") + log.info("The request was timed out") retry = True self.retries524 += 1 if retry: @@ -203,6 +207,9 @@ class NeonAPI: resp = self.__request( "GET", f"/projects/{project_id}/branches/{branch_id}", + # XXX Retry get parent details to work around the issue + # https://databricks.atlassian.net/browse/LKB-279 + retry404=True, headers={ "Accept": "application/json", }, diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index 5c43b06bc5..b106e9b729 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -13,7 +13,6 @@ from typing import TYPE_CHECKING, Any import pytest from fixtures.log_helper import log -from requests import HTTPError if TYPE_CHECKING: from pathlib import Path @@ -153,26 +152,11 @@ class NeonBranch: return self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) parent_id: str = res["branch"]["parent_id"] - # XXX Retry get parent details to work around the issue - # https://databricks.atlassian.net/browse/LKB-279 - target_time = datetime.now() + timedelta(seconds=30) - while datetime.now() < target_time: - try: - parent_def = self.neon_api.get_branch_details(self.project_id, parent_id) - except HTTPError as he: - if he.response.status_code == 404: - log.info("Branch not found, waiting...") - time.sleep(1) - else: - raise HTTPError(he) from he - else: - break - else: - raise RuntimeError(f"Branch {parent_id} not found") - # Creates an object for the parent branch # After the reset operation a new parent branch is created - parent = NeonBranch(self.project, parent_def, True) + parent = NeonBranch( + self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True + ) self.project.branches[parent_id] = parent self.parent = parent parent.children[self.id] = self From 2288efae662e41fcd2cf7369e3b4b9dc95d25e95 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Mon, 14 Jul 2025 14:41:31 +0100 Subject: [PATCH 11/11] Performance test for LFC prewarm (#12524) https://github.com/neondatabase/cloud/issues/19011 Measure relative performance for prewarmed and non-prewarmed endpoints. Add test that runs on every commit, and one performance test with a remote cluster. --- .github/actionlint.yml | 1 + .github/workflows/benchmarking.yml | 72 +++++++++ test_runner/fixtures/neon_api.py | 4 + test_runner/performance/test_lfc_prewarm.py | 167 ++++++++++++++++++++ 4 files changed, 244 insertions(+) create mode 100644 test_runner/performance/test_lfc_prewarm.py diff --git a/.github/actionlint.yml b/.github/actionlint.yml index 3142a36fa0..25b2fc702a 100644 --- a/.github/actionlint.yml +++ b/.github/actionlint.yml @@ -31,6 +31,7 @@ config-variables: - NEON_PROD_AWS_ACCOUNT_ID - PGREGRESS_PG16_PROJECT_ID - PGREGRESS_PG17_PROJECT_ID + - PREWARM_PGBENCH_SIZE - REMOTE_STORAGE_AZURE_CONTAINER - REMOTE_STORAGE_AZURE_REGION - SLACK_CICD_CHANNEL_ID diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 79371ec704..df80bad579 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -219,6 +219,7 @@ jobs: --ignore test_runner/performance/test_cumulative_statistics_persistence.py --ignore test_runner/performance/test_perf_many_relations.py --ignore test_runner/performance/test_perf_oltp_large_tenant.py + --ignore test_runner/performance/test_lfc_prewarm.py env: BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }} VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" @@ -410,6 +411,77 @@ jobs: env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + prewarm-test: + if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} + permissions: + contents: write + statuses: write + id-token: write # aws-actions/configure-aws-credentials + env: + PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }} + POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install + DEFAULT_PG_VERSION: 17 + TEST_OUTPUT: /tmp/test_output + BUILD_TYPE: remote + SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }} + PLATFORM: "neon-staging" + + runs-on: [ self-hosted, us-east-2, x64 ] + container: + image: ghcr.io/neondatabase/build-tools:pinned-bookworm + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --init + + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2 + with: + aws-region: eu-central-1 + role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + role-duration-seconds: 18000 # 5 hours + + - name: Download Neon artifact + uses: ./.github/actions/download + with: + name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact + path: /tmp/neon/ + prefix: latest + aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + + - name: Run prewarm benchmark + uses: ./.github/actions/run-python-test-set + with: + build_type: ${{ env.BUILD_TYPE }} + test_selection: performance/test_lfc_prewarm.py + run_in_parallel: false + save_perf_report: ${{ env.SAVE_PERF_REPORT }} + extra_params: -m remote_cluster --timeout 5400 + pg_version: ${{ env.DEFAULT_PG_VERSION }} + aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" + PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}" + NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }} + + - name: Create Allure report + id: create-allure-report + if: ${{ !cancelled() }} + uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} + generate-matrices: if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} # Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday) diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index e0f16abe77..bb618325e0 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -314,6 +314,10 @@ class NeonAPI: if endpoint_type: data["endpoint"]["type"] = endpoint_type if settings: + # otherwise we get 400 "settings must not be nil" + # TODO(myrrc): fix on cplane side + if "pg_settings" not in settings: + settings["pg_settings"] = {} data["endpoint"]["settings"] = settings resp = self.__request( diff --git a/test_runner/performance/test_lfc_prewarm.py b/test_runner/performance/test_lfc_prewarm.py new file mode 100644 index 0000000000..ad2c759a63 --- /dev/null +++ b/test_runner/performance/test_lfc_prewarm.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import os +import timeit +import traceback +from concurrent.futures import ThreadPoolExecutor as Exec +from pathlib import Path +from time import sleep +from typing import TYPE_CHECKING, Any, cast + +import pytest +from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult +from fixtures.log_helper import log +from fixtures.neon_api import NeonAPI, connection_parameters_to_env + +if TYPE_CHECKING: + from fixtures.compare_fixtures import NeonCompare + from fixtures.neon_fixtures import Endpoint, PgBin + from fixtures.pg_version import PgVersion + +from performance.test_perf_pgbench import utc_now_timestamp + +# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint +# compared to the endpoint which saves its LFC and prewarms using it on startup. + + +def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare): + env = neon_compare.env + env.create_branch("normal") + env.create_branch("prewarmed") + pg_bin = neon_compare.pg_bin + ep_normal: Endpoint = env.endpoints.create_start("normal") + ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True) + + for ep in [ep_normal, ep_prewarmed]: + connstr: str = ep.connstr() + pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"]) + ep.safe_psql("CREATE EXTENSION neon") + client = ep.http_client() + client.offload_lfc() + ep.stop() + ep.start() + client.prewarm_lfc_wait() + + run_start_timestamp = utc_now_timestamp() + t0 = timeit.default_timer() + out = pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr]) + run_duration = timeit.default_timer() - t0 + run_end_timestamp = utc_now_timestamp() + + stdout = Path(f"{out}.stdout").read_text() + res = PgBenchRunResult.parse_from_stdout( + stdout=stdout, + run_duration=run_duration, + run_start_timestamp=run_start_timestamp, + run_end_timestamp=run_end_timestamp, + ) + name: str = cast("str", ep.branch_name) + neon_compare.zenbenchmark.record_pg_bench_result(name, res) + + +@pytest.mark.remote_cluster +@pytest.mark.timeout(30 * 60) +def test_compare_prewarmed_pgbench_perf_benchmark( + pg_bin: PgBin, + neon_api: NeonAPI, + pg_version: PgVersion, + zenbenchmark: NeonBenchmarker, +): + name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}" + project = neon_api.create_project(pg_version, name) + project_id = project["project"]["id"] + neon_api.wait_for_operation_to_finish(project_id) + err = False + try: + benchmark_impl(pg_bin, neon_api, project, zenbenchmark) + except Exception as e: + err = True + log.error(f"Caught exception: {e}") + log.error(traceback.format_exc()) + finally: + assert not err + neon_api.delete_project(project_id) + + +def benchmark_impl( + pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker +): + pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB + offload_secs = 20 + test_duration_min = 5 + pgbench_duration = f"-T{test_duration_min * 60}" + # prewarm API is not publicly exposed. In order to test performance of a + # fully prewarmed endpoint, wait after it restarts + prewarmed_sleep_secs = 30 + + branch_id = project["branch"]["id"] + project_id = project["project"]["id"] + normal_env = connection_parameters_to_env( + project["connection_uris"][0]["connection_parameters"] + ) + normal_id = project["endpoints"][0]["id"] + + prewarmed_branch_id = neon_api.create_branch( + project_id, "prewarmed", parent_id=branch_id, add_endpoint=False + )["branch"]["id"] + neon_api.wait_for_operation_to_finish(project_id) + + ep_prewarmed = neon_api.create_endpoint( + project_id, + prewarmed_branch_id, + endpoint_type="read_write", + settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs}, + ) + neon_api.wait_for_operation_to_finish(project_id) + + prewarmed_env = normal_env.copy() + prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"] + prewarmed_id = ep_prewarmed["endpoint"]["id"] + + def bench(endpoint_name, endpoint_id, env): + pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env) + sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes + neon_api.restart_endpoint(project_id, endpoint_id) + sleep(prewarmed_sleep_secs) + + run_start_timestamp = utc_now_timestamp() + t0 = timeit.default_timer() + out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env) + run_duration = timeit.default_timer() - t0 + run_end_timestamp = utc_now_timestamp() + + stdout = Path(f"{out}.stdout").read_text() + res = PgBenchRunResult.parse_from_stdout( + stdout=stdout, + run_duration=run_duration, + run_start_timestamp=run_start_timestamp, + run_end_timestamp=run_end_timestamp, + ) + zenbenchmark.record_pg_bench_result(endpoint_name, res) + + with Exec(max_workers=2) as exe: + exe.submit(bench, "normal", normal_id, normal_env) + exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env) + + +def test_compare_prewarmed_read_perf(neon_compare: NeonCompare): + env = neon_compare.env + env.create_branch("normal") + env.create_branch("prewarmed") + ep_normal: Endpoint = env.endpoints.create_start("normal") + ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True) + + sql = [ + "CREATE EXTENSION neon", + "CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')", + "INSERT INTO foo SELECT FROM generate_series(1,1000000)", + ] + for ep in [ep_normal, ep_prewarmed]: + ep.safe_psql_many(sql) + client = ep.http_client() + client.offload_lfc() + ep.stop() + ep.start() + client.prewarm_lfc_wait() + with neon_compare.record_duration(f"{ep.branch_name}_run_duration"): + ep.safe_psql("SELECT count(*) from foo")