mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Add GetPageClass::is_bulk
This commit is contained in:
@@ -3,7 +3,7 @@ use std::num::NonZero;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use futures::stream::{FuturesUnordered, TryChunksError};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt as _, StreamExt as _};
|
||||
use tracing::instrument;
|
||||
|
||||
@@ -19,6 +19,7 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
|
||||
/// when full.
|
||||
///
|
||||
/// TODO: tune all of these constants, and consider making them configurable.
|
||||
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
|
||||
/// with only streams.
|
||||
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
|
||||
@@ -30,15 +31,15 @@ const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
|
||||
/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
|
||||
const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
|
||||
|
||||
/// Max per-stream queue depth.
|
||||
/// Max number of pipelined requests per stream.
|
||||
const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
|
||||
|
||||
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because
|
||||
/// these are more throughput-oriented, we have a smaller limit but higher queue depth.
|
||||
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
|
||||
/// are more throughput-oriented, we have a smaller limit but higher queue depth.
|
||||
const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
|
||||
|
||||
/// Max per-stream queue depth for bulk streams. These are more throughput-oriented and thus get
|
||||
/// a larger queue depth.
|
||||
/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
|
||||
/// get a larger queue depth.
|
||||
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
|
||||
|
||||
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
|
||||
@@ -165,18 +166,14 @@ impl PageserverClient {
|
||||
shard_id: ShardIndex,
|
||||
req: page_api::GetPageRequest,
|
||||
) -> tonic::Result<page_api::GetPageResponse> {
|
||||
// Determine whether this is a bulk request, which uses a different stream pool.
|
||||
let is_bulk = match req.request_class {
|
||||
page_api::GetPageClass::Unknown => false,
|
||||
page_api::GetPageClass::Normal => false,
|
||||
page_api::GetPageClass::Prefetch => true,
|
||||
page_api::GetPageClass::Background => true,
|
||||
};
|
||||
|
||||
let resp = self
|
||||
.retry
|
||||
.with(async || {
|
||||
let stream = self.shards.get(shard_id)?.stream(is_bulk).await;
|
||||
let stream = self
|
||||
.shards
|
||||
.get(shard_id)?
|
||||
.stream(req.request_class.is_bulk())
|
||||
.await;
|
||||
let resp = stream.send(req.clone()).await?;
|
||||
|
||||
// Convert per-request errors into a tonic::Status.
|
||||
@@ -314,8 +311,6 @@ impl Shards {
|
||||
/// * Bulk channel pool: unbounded.
|
||||
/// * Bulk client pool: unbounded.
|
||||
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
|
||||
///
|
||||
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
|
||||
struct Shard {
|
||||
/// Unary gRPC client pool.
|
||||
client_pool: Arc<ClientPool>,
|
||||
@@ -397,7 +392,8 @@ impl Shard {
|
||||
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
|
||||
}
|
||||
|
||||
/// Returns a pooled stream for this shard. If true, uses the bulk pool (e.g. for prefetches).
|
||||
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
|
||||
/// pool (e.g. for prefetches).
|
||||
async fn stream(&self, bulk: bool) -> StreamGuard {
|
||||
match bulk {
|
||||
false => self.stream_pool.get().await,
|
||||
|
||||
@@ -399,6 +399,19 @@ pub enum GetPageClass {
|
||||
Background,
|
||||
}
|
||||
|
||||
impl GetPageClass {
|
||||
/// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than
|
||||
/// latency-sensitive).
|
||||
pub fn is_bulk(&self) -> bool {
|
||||
match self {
|
||||
Self::Unknown => false,
|
||||
Self::Normal => false,
|
||||
Self::Prefetch => true,
|
||||
Self::Background => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<proto::GetPageClass> for GetPageClass {
|
||||
fn from(pb: proto::GetPageClass) -> Self {
|
||||
match pb {
|
||||
|
||||
Reference in New Issue
Block a user