diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 87e80a0dd6..63852868c3 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -3,7 +3,7 @@ use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; -use futures::stream::{FuturesUnordered, TryChunksError}; +use futures::stream::FuturesUnordered; use futures::{FutureExt as _, StreamExt as _}; use tracing::instrument; @@ -19,6 +19,7 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber}; /// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up /// when full. /// +/// TODO: tune all of these constants, and consider making them configurable. /// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels /// with only streams. const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); @@ -30,15 +31,15 @@ const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); /// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); -/// Max per-stream queue depth. +/// Max number of pipelined requests per stream. const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); -/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because -/// these are more throughput-oriented, we have a smaller limit but higher queue depth. +/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these +/// are more throughput-oriented, we have a smaller limit but higher queue depth. const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); -/// Max per-stream queue depth for bulk streams. These are more throughput-oriented and thus get -/// a larger queue depth. +/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus +/// get a larger queue depth. const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the @@ -165,18 +166,14 @@ impl PageserverClient { shard_id: ShardIndex, req: page_api::GetPageRequest, ) -> tonic::Result { - // Determine whether this is a bulk request, which uses a different stream pool. - let is_bulk = match req.request_class { - page_api::GetPageClass::Unknown => false, - page_api::GetPageClass::Normal => false, - page_api::GetPageClass::Prefetch => true, - page_api::GetPageClass::Background => true, - }; - let resp = self .retry .with(async || { - let stream = self.shards.get(shard_id)?.stream(is_bulk).await; + let stream = self + .shards + .get(shard_id)? + .stream(req.request_class.is_bulk()) + .await; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -314,8 +311,6 @@ impl Shards { /// * Bulk channel pool: unbounded. /// * Bulk client pool: unbounded. /// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. -/// -/// TODO: consider separate pools for normal and bulk traffic, with different settings. struct Shard { /// Unary gRPC client pool. client_pool: Arc, @@ -397,7 +392,8 @@ impl Shard { .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } - /// Returns a pooled stream for this shard. If true, uses the bulk pool (e.g. for prefetches). + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream + /// pool (e.g. for prefetches). async fn stream(&self, bulk: bool) -> StreamGuard { match bulk { false => self.stream_pool.get().await, diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index fc00b32d2e..af78212b68 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -399,6 +399,19 @@ pub enum GetPageClass { Background, } +impl GetPageClass { + /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than + /// latency-sensitive). + pub fn is_bulk(&self) -> bool { + match self { + Self::Unknown => false, + Self::Normal => false, + Self::Prefetch => true, + Self::Background => true, + } + } +} + impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb {