diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 4b606d6939..3a9edc7092 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -24,20 +24,23 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up /// when full. /// +/// Normal requests are small, and we don't pipeline them, so we can afford a large number of +/// streams per connection. +/// /// TODO: tune all of these constants, and consider making them configurable. -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(64).unwrap(); -/// Max number of concurrent unary request clients per shard. -const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); +/// Max number of concurrent bulk GetPage streams per channel (i.e. TCP connection). These use a +/// dedicated channel pool with a lower client limit, to avoid TCP-level head-of-line blocking and +/// transmission delays. This also concentrates large window sizes on a smaller set of +/// streams/connections, presumably reducing memory use. +const MAX_BULK_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); -/// Max number of concurrent GetPage streams per shard. -const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); - -/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these -/// are more throughput-oriented, we have a smaller limit. -const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); +/// The batch size threshold at which a GetPage request will use the bulk stream pool. +/// +/// The gRPC initial window size is 64 KB. Each page is 8 KB, so let's avoid increasing the window +/// size for the normal stream pool, and route requests for >= 5 pages (>32 KB) to the bulk pool. +const BULK_THRESHOLD_BATCH_SIZE: usize = 5; /// The overall request call timeout, including retries and pool acquisition. /// TODO: should we retry forever? Should the caller decide? @@ -62,10 +65,19 @@ const SLOW_THRESHOLD: Duration = Duration::from_secs(3); /// * Sharded tenants across multiple Pageservers. /// * Pooling of connections, clients, and streams for efficient resource use. /// * Concurrent use by many callers. -/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling. +/// * Internal handling of GetPage bidirectional streams. /// * Automatic retries. /// * Observability. /// +/// The client has dedicated connection/client/stream pools per shard, for resource reuse. These +/// pools are unbounded: we allow scaling out as many concurrent streams as needed to serve all +/// concurrent callers, which mostly eliminates head-of-line blocking. Idle streams are fairly +/// cheap: the server task currently uses 26 KB of memory, so we can comfortably fit 100,000 +/// concurrent idle streams (2.5 GB memory). The worst case degenerates to the old libpq case with +/// one stream per backend, but without the TCP connection overhead. In the common case we expect +/// significantly lower stream counts due to stream sharing, driven e.g. by idle backends, LFC hits, +/// read coalescing, sharding (backends typically only talk to one shard at a time), etc. +/// /// TODO: this client does not support base backups or LSN leases, as these are only used by /// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards. pub struct PageserverClient { @@ -264,7 +276,7 @@ impl PageserverClient { req: page_api::GetPageRequest, shard: &Shard, ) -> tonic::Result { - let mut stream = shard.stream(req.request_class.is_bulk()).await?; + let mut stream = shard.stream(Self::is_bulk(&req)).await?; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -365,6 +377,11 @@ impl PageserverClient { )) })? } + + /// Returns true if the request is considered a bulk request and should use the bulk pool. + fn is_bulk(req: &page_api::GetPageRequest) -> bool { + req.block_numbers.len() >= BULK_THRESHOLD_BATCH_SIZE + } } /// Shard specification for a PageserverClient. @@ -492,15 +509,23 @@ impl Shards { } } -/// A single shard. Uses dedicated resource pools with the following structure: +/// A single shard. Has dedicated resource pools with the following structure: /// -/// * Channel pool: unbounded. -/// * Unary client pool: MAX_UNARY_CLIENTS. -/// * Stream client pool: unbounded. -/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. -/// * Bulk channel pool: unbounded. +/// * Channel pool: MAX_CLIENTS_PER_CHANNEL. +/// * Client pool: unbounded. +/// * Stream pool: unbounded. +/// * Bulk channel pool: MAX_BULK_CLIENTS_PER_CHANNEL. /// * Bulk client pool: unbounded. -/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. +/// * Bulk stream pool: unbounded. +/// +/// We use a separate bulk channel pool with a lower concurrency limit for large batch requests. +/// This avoids TCP-level head-of-line blocking, and also concentrates large window sizes on a +/// smaller set of streams/connections, which presumably reduces memory use. Neither of these pools +/// are bounded, nor do they pipeline requests, so the latency characteristics should be mostly +/// similar (except for TCP transmission time). +/// +/// TODO: since we never use bounded pools, we could consider removing the pool limiters. However, +/// the code is fairly trivial, so we may as well keep them around for now in case we need them. struct Shard { /// The shard ID. id: ShardIndex, @@ -508,7 +533,7 @@ struct Shard { client_pool: Arc, /// GetPage stream pool. stream_pool: Arc, - /// GetPage stream pool for bulk requests, e.g. prefetches. + /// GetPage stream pool for bulk requests. bulk_stream_pool: Arc, } @@ -522,48 +547,30 @@ impl Shard { auth_token: Option, compression: Option, ) -> anyhow::Result { - // Common channel pool for unary and stream requests. Bounded by client/stream pools. - let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - - // Client pool for unary requests. + // Shard pools for unary requests and non-bulk GetPage requests. let client_pool = ClientPool::new( - channel_pool.clone(), + ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?, tenant_id, timeline_id, shard_id, auth_token.clone(), compression, - Some(MAX_UNARY_CLIENTS), + None, // unbounded ); + let stream_pool = StreamPool::new(client_pool.clone(), None); // unbounded - // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, - // but shares a channel pool with it (as it's unbounded). - let stream_pool = StreamPool::new( - ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token.clone(), - compression, - None, // unbounded, limited by stream pool - ), - Some(MAX_STREAMS), - ); - - // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools - // to avoid head-of-line blocking of latency-sensitive requests. + // Bulk GetPage stream pool for large batches (prefetches, sequential scans, vacuum, etc.). let bulk_stream_pool = StreamPool::new( ClientPool::new( - ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + ChannelPool::new(url, MAX_BULK_CLIENTS_PER_CHANNEL)?, tenant_id, timeline_id, shard_id, auth_token, compression, - None, // unbounded, limited by stream pool + None, // unbounded, ), - Some(MAX_BULK_STREAMS), + None, // unbounded ); Ok(Self { @@ -585,8 +592,7 @@ impl Shard { .await } - /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream - /// pool (e.g. for prefetches). + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk pool. #[instrument(skip_all, fields(bulk))] async fn stream(&self, bulk: bool) -> tonic::Result { let pool = match bulk { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 76355ae546..a3286ecf15 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -444,19 +444,6 @@ pub enum GetPageClass { Background, } -impl GetPageClass { - /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than - /// latency-sensitive). - pub fn is_bulk(&self) -> bool { - match self { - Self::Unknown => false, - Self::Normal => false, - Self::Prefetch => true, - Self::Background => true, - } - } -} - impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb {