From cb698a3951a154affb8be1fa9e29f629fb664db6 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 4 Jul 2025 21:52:25 +0200 Subject: [PATCH] Add dedicated client pools for bulk requests --- pageserver/client_grpc/src/client.rs | 116 +++++++++++++++++----- pageserver/client_grpc/src/pool.rs | 140 ++++++++++++++------------- pageserver/page_api/src/model.rs | 2 +- 3 files changed, 162 insertions(+), 96 deletions(-) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index c21ce2e47d..87e80a0dd6 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; +use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; -use futures::stream::FuturesUnordered; +use futures::stream::{FuturesUnordered, TryChunksError}; use futures::{FutureExt as _, StreamExt as _}; use tracing::instrument; @@ -15,6 +16,31 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; +/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up +/// when full. +/// +/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels +/// with only streams. +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); + +/// Max number of concurrent unary request clients per shard. +const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage +/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. +const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); + +/// Max per-stream queue depth. +const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); + +/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because +/// these are more throughput-oriented, we have a smaller limit but higher queue depth. +const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); + +/// Max per-stream queue depth for bulk streams. These are more throughput-oriented and thus get +/// a larger queue depth. +const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); + /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: /// @@ -87,6 +113,7 @@ impl PageserverClient { /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, + class = %req.request_class, rel = %req.rel, blkno = %req.block_numbers[0], blks = %req.block_numbers.len(), @@ -138,10 +165,18 @@ impl PageserverClient { shard_id: ShardIndex, req: page_api::GetPageRequest, ) -> tonic::Result { + // Determine whether this is a bulk request, which uses a different stream pool. + let is_bulk = match req.request_class { + page_api::GetPageClass::Unknown => false, + page_api::GetPageClass::Normal => false, + page_api::GetPageClass::Prefetch => true, + page_api::GetPageClass::Background => true, + }; + let resp = self .retry .with(async || { - let stream = self.shards.get(shard_id)?.stream().await; + let stream = self.shards.get(shard_id)?.stream(is_bulk).await; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -270,17 +305,24 @@ impl Shards { } } -/// A single shard. +/// A single shard. Uses dedicated resource pools with the following structure: +/// +/// * Channel pool: unbounded. +/// * Unary client pool: MAX_UNARY_CLIENTS. +/// * Stream client pool: unbounded. +/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. +/// * Bulk channel pool: unbounded. +/// * Bulk client pool: unbounded. +/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. /// /// TODO: consider separate pools for normal and bulk traffic, with different settings. struct Shard { - /// Dedicated channel pool for this shard. Shared by all clients/streams in this shard. - _channel_pool: Arc, - /// Unary gRPC client pool for this shard. Uses the shared channel pool. + /// Unary gRPC client pool. client_pool: Arc, - /// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel - /// pool with unary clients. + /// GetPage stream pool. stream_pool: Arc, + /// GetPage stream pool for bulk requests, e.g. prefetches. + bulk_stream_pool: Arc, } impl Shard { @@ -297,34 +339,53 @@ impl Shard { return Err(anyhow!("invalid shard URL {url}: must use gRPC")); } - // Use a common channel pool for all clients, to multiplex unary and stream requests across - // the same TCP connections. The channel pool is unbounded (but client pools are bounded). - let channel_pool = ChannelPool::new(url)?; + // Common channel pool for unary and stream requests. Bounded by client/stream pools. + let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - // Dedicated client pool for unary requests. + // Client pool for unary requests. let client_pool = ClientPool::new( channel_pool.clone(), tenant_id, timeline_id, shard_id, auth_token.clone(), + Some(MAX_UNARY_CLIENTS), ); - // Stream pool with dedicated client pool. If this shared a client pool with unary requests, - // long-lived streams could fill up the client pool and starve out unary requests. It shares - // the same underlying channel pool with unary clients though, which is unbounded. - let stream_pool = StreamPool::new(ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token, - )); + // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, + // but shares a channel pool with it (as it's unbounded). + let stream_pool = StreamPool::new( + ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + None, // unbounded, limited by stream pool + ), + Some(MAX_STREAMS), + MAX_STREAM_QUEUE_DEPTH, + ); + + // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools + // to avoid head-of-line blocking of latency-sensitive requests. + let bulk_stream_pool = StreamPool::new( + ClientPool::new( + ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + tenant_id, + timeline_id, + shard_id, + auth_token, + None, // unbounded, limited by stream pool + ), + Some(MAX_BULK_STREAMS), + MAX_BULK_STREAM_QUEUE_DEPTH, + ); Ok(Self { - _channel_pool: channel_pool, client_pool, stream_pool, + bulk_stream_pool, }) } @@ -336,8 +397,11 @@ impl Shard { .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } - /// Returns a pooled stream for this shard. - async fn stream(&self) -> StreamGuard { - self.stream_pool.get().await + /// Returns a pooled stream for this shard. If true, uses the bulk pool (e.g. for prefetches). + async fn stream(&self, bulk: bool) -> StreamGuard { + match bulk { + false => self.stream_pool.get().await, + true => self.bulk_stream_pool.get().await, + } } } diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 78db97ef9c..be99b29c18 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -31,6 +31,7 @@ //! TODO: observability. use std::collections::{BTreeMap, HashMap}; +use std::num::NonZero; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; @@ -47,21 +48,10 @@ use utils::shard::ShardIndex; // TODO: tune these constants, and make them configurable. -/// Max number of concurrent clients per channel. -/// -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const CLIENTS_PER_CHANNEL: usize = 16; - -/// Maximum number of concurrent clients per `ClientPool`. -const CLIENT_LIMIT: usize = 64; - -/// Max number of pipelined requests per gRPC GetPage stream. -const STREAM_QUEUE_DEPTH: usize = 2; - /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2 -/// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of -/// channels, and instead relies on `ClientPool` to limit the number of concurrent clients. +/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this. +/// The pool does not limit the number of channels, and instead relies on `ClientPool` or +/// `StreamPool` to limit the number of concurrent clients. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// @@ -71,6 +61,8 @@ const STREAM_QUEUE_DEPTH: usize = 2; pub struct ChannelPool { /// Pageserver endpoint to connect to. endpoint: Endpoint, + /// Max number of clients per channel. Beyond this, a new channel will be created. + max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, /// Channel ID generator. @@ -88,13 +80,14 @@ struct ChannelEntry { impl ChannelPool { /// Creates a new channel pool for the given Pageserver endpoint. - pub fn new(endpoint: E) -> anyhow::Result> + pub fn new(endpoint: E, max_clients_per_channel: NonZero) -> anyhow::Result> where E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { Ok(Arc::new(Self { endpoint: endpoint.try_into()?, + max_clients_per_channel, channels: Mutex::default(), next_channel_id: AtomicUsize::default(), })) @@ -122,8 +115,11 @@ impl ChannelPool { // with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { - assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow"); - if entry.clients < CLIENTS_PER_CHANNEL { + assert!( + entry.clients <= self.max_clients_per_channel.get(), + "channel overflow" + ); + if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; return ChannelGuard { pool: Arc::downgrade(self), @@ -183,7 +179,7 @@ impl Drop for ChannelGuard { /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total -/// number of concurrent clients to `CLIENT_LIMIT` via semaphore. +/// number of concurrent clients to `max_clients` via semaphore. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// @@ -199,8 +195,8 @@ pub struct ClientPool { auth_token: Option, /// Channel pool to acquire channels from. channel_pool: Arc, - /// Limits the max number of concurrent clients for this pool. - limiter: Arc, + /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. + limiter: Option>, /// Idle pooled clients. Acquired clients are removed from here and returned on drop. /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in @@ -223,13 +219,15 @@ struct ClientEntry { impl ClientPool { /// Creates a new client pool for the given tenant shard. Channels are acquired from the given - /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. + /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to + /// `max_clients` concurrent clients, or unbounded if None. pub fn new( channel_pool: Arc, tenant_id: TenantId, timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + max_clients: Option>, ) -> Arc { Arc::new(Self { tenant_id, @@ -238,25 +236,24 @@ impl ClientPool { auth_token, channel_pool, idle: Mutex::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT)), + limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))), next_client_id: AtomicUsize::default(), }) } /// Gets a client from the pool, or creates a new one if necessary. Connections are established - /// lazily and does not block, but this call can block if the pool is at `CLIENT_LIMIT`. The - /// client is returned to the pool when the guard is dropped. + /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client + /// is returned to the pool when the guard is dropped. /// /// This is moderately performance-sensitive. It is called for every unary request, but recall /// that these establish a new gRPC stream per request so they're already expensive. GetPage /// requests use the `StreamPool` instead. pub async fn get(self: &Arc) -> anyhow::Result { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } // Fast path: acquire an idle client from the pool. if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() { @@ -298,9 +295,9 @@ impl ClientPool { pub struct ClientGuard { pool: Weak, id: ClientID, - client: Option, // Some until dropped - channel_guard: Option, // Some until dropped - permit: OwnedSemaphorePermit, + client: Option, // Some until dropped + channel_guard: Option, // Some until dropped + permit: Option, // None if pool is unbounded } impl Deref for ClientGuard { @@ -343,16 +340,21 @@ impl Drop for ClientGuard { /// TODO: reap idle streams. /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { - /// The client pool to acquire clients from. + /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, /// All pooled streams. /// /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit). - /// Each stream has an associated Tokio task that processes requests and responses. + /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each + /// stream has an associated Tokio task that processes requests and responses. streams: Arc>>, - /// Limits the max number of concurrent requests (not streams). - limiter: Arc, + /// The max number of concurrent streams, or None if unbounded. + max_streams: Option>, + /// The max number of concurrent requests per stream. + max_queue_depth: NonZero, + /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. + /// None if the pool is unbounded. + limiter: Option>, /// Stream ID generator. next_stream_id: AtomicUsize, } @@ -371,16 +373,27 @@ struct StreamEntry { } impl StreamPool { - /// Creates a new stream pool, using the given client pool. + /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` + /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. /// - /// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may - /// fill up the client pool and starve out unary requests. Client pools can share the same - /// `ChannelPool` though, since the channel pool is unbounded. - pub fn new(client_pool: Arc) -> Arc { + /// The client pool must be unbounded. The stream pool will enforce its own limits, and because + /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. + /// The stream pool should generally have its own dedicated client pool (but it can share a + /// channel pool with others since these are always unbounded). + pub fn new( + client_pool: Arc, + max_streams: Option>, + max_queue_depth: NonZero, + ) -> Arc { + assert!(client_pool.limiter.is_none(), "bounded client pool"); Arc::new(Self { client_pool, streams: Arc::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)), + limiter: max_streams.map(|max_streams| { + Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) + }), + max_streams, + max_queue_depth, next_stream_id: AtomicUsize::default(), }) } @@ -403,18 +416,17 @@ impl StreamPool { /// /// For now, we just do something simple and functional, but very inefficient (linear scan). pub async fn get(&self) -> StreamGuard { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } let mut streams = self.streams.lock().unwrap(); // Look for a pooled stream with available capacity. for entry in streams.values() { assert!( - entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH, + entry.queue_depth.load(Ordering::Relaxed) <= self.max_queue_depth.get(), "stream queue overflow" ); if entry @@ -422,7 +434,7 @@ impl StreamPool { .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { // Increment the queue depth via compare-and-swap. // TODO: review ordering. - (queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1) + (queue_depth < self.max_queue_depth.get()).then_some(queue_depth + 1) }) .is_ok() { @@ -437,25 +449,18 @@ impl StreamPool { // No available stream, spin up a new one. We install the stream entry in the pool first and // return the guard, while spinning up the stream task async. This allows other callers to // join onto this stream and also create additional streams concurrently if this fills up. - // - // NB: we have to be careful not to overshoot here. The semaphore limit is CLIENT_LIMIT * - // STREAM_QUEUE_DEPTH, but if we were to miss a concurrent queue depth allocation we'd try - // to spin up more streams than CLIENT_LIMIT and block on the client pool ~forever. Because - // we only acquire queue depth under lock. let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); + let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); let entry = StreamEntry { sender: req_tx.clone(), queue_depth: queue_depth.clone(), }; streams.insert(id, entry); - // NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT * - // STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more - // streams than CLIENT_LIMIT and block on the client pool ~forever. This should not be - // possible because we only acquire queue depth under lock. - assert!(streams.len() <= CLIENT_LIMIT, "stream overflow"); + if let Some(max_streams) = self.max_streams { + assert!(streams.len() <= max_streams.get(), "stream overflow"); + }; let client_pool = self.client_pool.clone(); let streams = self.streams.clone(); @@ -490,19 +495,16 @@ impl StreamPool { // Acquire a client from the pool and create a stream. let mut client = client_pool.get().await?; - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); + let (req_tx, req_rx) = mpsc::channel(1); let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); let mut resp_stream = client.get_pages(req_stream).await?; // Track caller response channels by request ID. If the task returns early, these response // channels will be dropped and the waiting callers will receive an error. - let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH); + let mut callers = HashMap::new(); // Process requests and responses. loop { - // NB: this can trip if the server doesn't respond to a request, so only debug_assert. - debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow"); - tokio::select! { // Receive requests from callers and send them to the stream. req = caller_rx.recv() => { @@ -551,7 +553,7 @@ impl StreamPool { pub struct StreamGuard { sender: RequestSender, queue_depth: Arc, - permit: OwnedSemaphorePermit, + permit: Option, // None if pool is unbounded } impl StreamGuard { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 84eb636a2a..fc00b32d2e 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -386,7 +386,7 @@ impl From for proto::GetPageRequest { pub type RequestID = u64; /// A GetPage request class. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed.