diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index c21ce2e47d..63852868c3 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; @@ -15,6 +16,32 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; +/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up +/// when full. +/// +/// TODO: tune all of these constants, and consider making them configurable. +/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels +/// with only streams. +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); + +/// Max number of concurrent unary request clients per shard. +const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage +/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. +const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of pipelined requests per stream. +const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); + +/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these +/// are more throughput-oriented, we have a smaller limit but higher queue depth. +const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); + +/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus +/// get a larger queue depth. +const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); + /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: /// @@ -87,6 +114,7 @@ impl PageserverClient { /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, + class = %req.request_class, rel = %req.rel, blkno = %req.block_numbers[0], blks = %req.block_numbers.len(), @@ -141,7 +169,11 @@ impl PageserverClient { let resp = self .retry .with(async || { - let stream = self.shards.get(shard_id)?.stream().await; + let stream = self + .shards + .get(shard_id)? + .stream(req.request_class.is_bulk()) + .await; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -270,17 +302,22 @@ impl Shards { } } -/// A single shard. +/// A single shard. Uses dedicated resource pools with the following structure: /// -/// TODO: consider separate pools for normal and bulk traffic, with different settings. +/// * Channel pool: unbounded. +/// * Unary client pool: MAX_UNARY_CLIENTS. +/// * Stream client pool: unbounded. +/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. +/// * Bulk channel pool: unbounded. +/// * Bulk client pool: unbounded. +/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. struct Shard { - /// Dedicated channel pool for this shard. Shared by all clients/streams in this shard. - _channel_pool: Arc, - /// Unary gRPC client pool for this shard. Uses the shared channel pool. + /// Unary gRPC client pool. client_pool: Arc, - /// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel - /// pool with unary clients. + /// GetPage stream pool. stream_pool: Arc, + /// GetPage stream pool for bulk requests, e.g. prefetches. + bulk_stream_pool: Arc, } impl Shard { @@ -297,34 +334,53 @@ impl Shard { return Err(anyhow!("invalid shard URL {url}: must use gRPC")); } - // Use a common channel pool for all clients, to multiplex unary and stream requests across - // the same TCP connections. The channel pool is unbounded (but client pools are bounded). - let channel_pool = ChannelPool::new(url)?; + // Common channel pool for unary and stream requests. Bounded by client/stream pools. + let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - // Dedicated client pool for unary requests. + // Client pool for unary requests. let client_pool = ClientPool::new( channel_pool.clone(), tenant_id, timeline_id, shard_id, auth_token.clone(), + Some(MAX_UNARY_CLIENTS), ); - // Stream pool with dedicated client pool. If this shared a client pool with unary requests, - // long-lived streams could fill up the client pool and starve out unary requests. It shares - // the same underlying channel pool with unary clients though, which is unbounded. - let stream_pool = StreamPool::new(ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token, - )); + // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, + // but shares a channel pool with it (as it's unbounded). + let stream_pool = StreamPool::new( + ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + None, // unbounded, limited by stream pool + ), + Some(MAX_STREAMS), + MAX_STREAM_QUEUE_DEPTH, + ); + + // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools + // to avoid head-of-line blocking of latency-sensitive requests. + let bulk_stream_pool = StreamPool::new( + ClientPool::new( + ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + tenant_id, + timeline_id, + shard_id, + auth_token, + None, // unbounded, limited by stream pool + ), + Some(MAX_BULK_STREAMS), + MAX_BULK_STREAM_QUEUE_DEPTH, + ); Ok(Self { - _channel_pool: channel_pool, client_pool, stream_pool, + bulk_stream_pool, }) } @@ -336,8 +392,12 @@ impl Shard { .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } - /// Returns a pooled stream for this shard. - async fn stream(&self) -> StreamGuard { - self.stream_pool.get().await + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream + /// pool (e.g. for prefetches). + async fn stream(&self, bulk: bool) -> StreamGuard { + match bulk { + false => self.stream_pool.get().await, + true => self.bulk_stream_pool.get().await, + } } } diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 518e4e5b84..5a50004fd1 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -30,6 +30,7 @@ //! TODO: observability. use std::collections::{BTreeMap, HashMap}; +use std::num::NonZero; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; @@ -44,22 +45,10 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Max number of concurrent clients per channel. -/// -/// TODO: tune these constants, and make them configurable. -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const CLIENTS_PER_CHANNEL: usize = 16; - -/// Maximum number of concurrent clients per `ClientPool`. -const CLIENT_LIMIT: usize = 64; - -/// Max number of pipelined requests per gRPC GetPage stream. -const STREAM_QUEUE_DEPTH: usize = 2; - /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2 -/// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of -/// channels, and instead relies on `ClientPool` to limit the number of concurrent clients. +/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this. +/// The pool does not limit the number of channels, and instead relies on `ClientPool` or +/// `StreamPool` to limit the number of concurrent clients. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// @@ -69,6 +58,8 @@ const STREAM_QUEUE_DEPTH: usize = 2; pub struct ChannelPool { /// Pageserver endpoint to connect to. endpoint: Endpoint, + /// Max number of clients per channel. Beyond this, a new channel will be created. + max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, /// Channel ID generator. @@ -86,13 +77,14 @@ struct ChannelEntry { impl ChannelPool { /// Creates a new channel pool for the given Pageserver endpoint. - pub fn new(endpoint: E) -> anyhow::Result> + pub fn new(endpoint: E, max_clients_per_channel: NonZero) -> anyhow::Result> where E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { Ok(Arc::new(Self { endpoint: endpoint.try_into()?, + max_clients_per_channel, channels: Mutex::default(), next_channel_id: AtomicUsize::default(), })) @@ -120,8 +112,11 @@ impl ChannelPool { // with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { - assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow"); - if entry.clients < CLIENTS_PER_CHANNEL { + assert!( + entry.clients <= self.max_clients_per_channel.get(), + "channel overflow" + ); + if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; return ChannelGuard { pool: Arc::downgrade(self), @@ -181,7 +176,7 @@ impl Drop for ChannelGuard { /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total -/// number of concurrent clients to `CLIENT_LIMIT` via semaphore. +/// number of concurrent clients to `max_clients` via semaphore. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// @@ -197,8 +192,8 @@ pub struct ClientPool { auth_token: Option, /// Channel pool to acquire channels from. channel_pool: Arc, - /// Limits the max number of concurrent clients for this pool. - limiter: Arc, + /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. + limiter: Option>, /// Idle pooled clients. Acquired clients are removed from here and returned on drop. /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in @@ -221,13 +216,15 @@ struct ClientEntry { impl ClientPool { /// Creates a new client pool for the given tenant shard. Channels are acquired from the given - /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. + /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to + /// `max_clients` concurrent clients, or unbounded if None. pub fn new( channel_pool: Arc, tenant_id: TenantId, timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + max_clients: Option>, ) -> Arc { Arc::new(Self { tenant_id, @@ -236,25 +233,24 @@ impl ClientPool { auth_token, channel_pool, idle: Mutex::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT)), + limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))), next_client_id: AtomicUsize::default(), }) } /// Gets a client from the pool, or creates a new one if necessary. Connections are established - /// lazily and do not block, but this call can block if the pool is at `CLIENT_LIMIT`. The - /// client is returned to the pool when the guard is dropped. + /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client + /// is returned to the pool when the guard is dropped. /// /// This is moderately performance-sensitive. It is called for every unary request, but these /// establish a new gRPC stream per request so they're already expensive. GetPage requests use /// the `StreamPool` instead. pub async fn get(self: &Arc) -> anyhow::Result { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } // Fast path: acquire an idle client from the pool. if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() { @@ -296,9 +292,9 @@ impl ClientPool { pub struct ClientGuard { pool: Weak, id: ClientID, - client: Option, // Some until dropped - channel_guard: Option, // Some until dropped - permit: OwnedSemaphorePermit, + client: Option, // Some until dropped + channel_guard: Option, // Some until dropped + permit: Option, // None if pool is unbounded } impl Deref for ClientGuard { @@ -341,16 +337,21 @@ impl Drop for ClientGuard { /// TODO: reap idle streams. /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { - /// The client pool to acquire clients from. + /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, /// All pooled streams. /// /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit). - /// Each stream has an associated Tokio task that processes requests and responses. + /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each + /// stream has an associated Tokio task that processes requests and responses. streams: Arc>>, - /// Limits the max number of concurrent requests (not streams). - limiter: Arc, + /// The max number of concurrent streams, or None if unbounded. + max_streams: Option>, + /// The max number of concurrent requests per stream. + max_queue_depth: NonZero, + /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. + /// None if the pool is unbounded. + limiter: Option>, /// Stream ID generator. next_stream_id: AtomicUsize, } @@ -369,16 +370,27 @@ struct StreamEntry { } impl StreamPool { - /// Creates a new stream pool, using the given client pool. + /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` + /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. /// - /// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may - /// fill up the client pool and starve out unary requests. Client pools can share the same - /// `ChannelPool` though, since the channel pool is unbounded. - pub fn new(client_pool: Arc) -> Arc { + /// The client pool must be unbounded. The stream pool will enforce its own limits, and because + /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. + /// The stream pool should generally have its own dedicated client pool (but it can share a + /// channel pool with others since these are always unbounded). + pub fn new( + client_pool: Arc, + max_streams: Option>, + max_queue_depth: NonZero, + ) -> Arc { + assert!(client_pool.limiter.is_none(), "bounded client pool"); Arc::new(Self { client_pool, streams: Arc::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)), + limiter: max_streams.map(|max_streams| { + Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) + }), + max_streams, + max_queue_depth, next_stream_id: AtomicUsize::default(), }) } @@ -402,18 +414,17 @@ impl StreamPool { /// /// For now, we just do something simple and functional, but very inefficient (linear scan). pub async fn get(&self) -> StreamGuard { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } let mut streams = self.streams.lock().unwrap(); // Look for a pooled stream with available capacity. for entry in streams.values() { assert!( - entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH, + entry.queue_depth.load(Ordering::Relaxed) <= self.max_queue_depth.get(), "stream queue overflow" ); if entry @@ -421,7 +432,7 @@ impl StreamPool { .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { // Increment the queue depth via compare-and-swap. // TODO: review ordering. - (queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1) + (queue_depth < self.max_queue_depth.get()).then_some(queue_depth + 1) }) .is_ok() { @@ -438,18 +449,16 @@ impl StreamPool { // join onto this stream and also create additional streams concurrently if this fills up. let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); + let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); let entry = StreamEntry { sender: req_tx.clone(), queue_depth: queue_depth.clone(), }; streams.insert(id, entry); - // NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT * - // STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more - // streams than CLIENT_LIMIT and block on the client pool ~forever. This should not happen - // because we only acquire queue depth under lock and after acquiring a semaphore permit. - assert!(streams.len() <= CLIENT_LIMIT, "stream overflow"); + if let Some(max_streams) = self.max_streams { + assert!(streams.len() <= max_streams.get(), "stream overflow"); + }; let client_pool = self.client_pool.clone(); let streams = self.streams.clone(); @@ -484,19 +493,22 @@ impl StreamPool { // Acquire a client from the pool and create a stream. let mut client = client_pool.get().await?; - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); - let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); + // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could + // theoretically deadlock if both the client and server block on sends (since we're not + // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and + // low queue depths, but it was seen to happen with the libpq protocol so better safe than + // sorry. It should never buffer more than the queue depth anyway, but using an unbounded + // channel guarantees that it will never block. + let (req_tx, req_rx) = mpsc::unbounded_channel(); + let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); let mut resp_stream = client.get_pages(req_stream).await?; // Track caller response channels by request ID. If the task returns early, these response // channels will be dropped and the waiting callers will receive an error. - let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH); + let mut callers = HashMap::new(); // Process requests and responses. loop { - // NB: this can trip if the server doesn't respond to a request, so only debug_assert. - debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow"); - tokio::select! { // Receive requests from callers and send them to the stream. req = caller_rx.recv() => { @@ -515,8 +527,8 @@ impl StreamPool { } callers.insert(req.request_id, resp_tx); - // Send the request on the stream. Bail out if the send fails. - req_tx.send(req).await.map_err(|_| { + // Send the request on the stream. Bail out if the stream is closed. + req_tx.send(req).map_err(|_| { tonic::Status::unavailable("stream closed") })?; } @@ -545,7 +557,7 @@ impl StreamPool { pub struct StreamGuard { sender: RequestSender, queue_depth: Arc, - permit: OwnedSemaphorePermit, + permit: Option, // None if pool is unbounded } impl StreamGuard { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index c5b6f06879..d0d3517d41 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -384,7 +384,7 @@ impl From for proto::GetPageRequest { pub type RequestID = u64; /// A GetPage request class. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. @@ -397,6 +397,19 @@ pub enum GetPageClass { Background, } +impl GetPageClass { + /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than + /// latency-sensitive). + pub fn is_bulk(&self) -> bool { + match self { + Self::Unknown => false, + Self::Normal => false, + Self::Prefetch => true, + Self::Background => true, + } + } +} + impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb {