Add dedicated client pools for bulk requests

This commit is contained in:
Erik Grinaker
2025-07-04 21:52:25 +02:00
parent f6cc5cbd0c
commit cb698a3951
3 changed files with 162 additions and 96 deletions

View File

@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::num::NonZero;
use std::sync::Arc;
use anyhow::anyhow;
use futures::stream::FuturesUnordered;
use futures::stream::{FuturesUnordered, TryChunksError};
use futures::{FutureExt as _, StreamExt as _};
use tracing::instrument;
@@ -15,6 +16,31 @@ use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
/// when full.
///
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
/// with only streams.
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of concurrent unary request clients per shard.
const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max per-stream queue depth.
const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because
/// these are more throughput-oriented, we have a smaller limit but higher queue depth.
const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
/// Max per-stream queue depth for bulk streams. These are more throughput-oriented and thus get
/// a larger queue depth.
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
///
@@ -87,6 +113,7 @@ impl PageserverClient {
/// errors. All responses will have `GetPageStatusCode::Ok`.
#[instrument(skip_all, fields(
req_id = %req.request_id,
class = %req.request_class,
rel = %req.rel,
blkno = %req.block_numbers[0],
blks = %req.block_numbers.len(),
@@ -138,10 +165,18 @@ impl PageserverClient {
shard_id: ShardIndex,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// Determine whether this is a bulk request, which uses a different stream pool.
let is_bulk = match req.request_class {
page_api::GetPageClass::Unknown => false,
page_api::GetPageClass::Normal => false,
page_api::GetPageClass::Prefetch => true,
page_api::GetPageClass::Background => true,
};
let resp = self
.retry
.with(async || {
let stream = self.shards.get(shard_id)?.stream().await;
let stream = self.shards.get(shard_id)?.stream(is_bulk).await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
@@ -270,17 +305,24 @@ impl Shards {
}
}
/// A single shard.
/// A single shard. Uses dedicated resource pools with the following structure:
///
/// * Channel pool: unbounded.
/// * Unary client pool: MAX_UNARY_CLIENTS.
/// * Stream client pool: unbounded.
/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
/// * Bulk channel pool: unbounded.
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
///
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
struct Shard {
/// Dedicated channel pool for this shard. Shared by all clients/streams in this shard.
_channel_pool: Arc<ChannelPool>,
/// Unary gRPC client pool for this shard. Uses the shared channel pool.
/// Unary gRPC client pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel
/// pool with unary clients.
/// GetPage stream pool.
stream_pool: Arc<StreamPool>,
/// GetPage stream pool for bulk requests, e.g. prefetches.
bulk_stream_pool: Arc<StreamPool>,
}
impl Shard {
@@ -297,34 +339,53 @@ impl Shard {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
// Use a common channel pool for all clients, to multiplex unary and stream requests across
// the same TCP connections. The channel pool is unbounded (but client pools are bounded).
let channel_pool = ChannelPool::new(url)?;
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
// Dedicated client pool for unary requests.
// Client pool for unary requests.
let client_pool = ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
Some(MAX_UNARY_CLIENTS),
);
// Stream pool with dedicated client pool. If this shared a client pool with unary requests,
// long-lived streams could fill up the client pool and starve out unary requests. It shares
// the same underlying channel pool with unary clients though, which is unbounded.
let stream_pool = StreamPool::new(ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token,
));
// GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
// but shares a channel pool with it (as it's unbounded).
let stream_pool = StreamPool::new(
ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
None, // unbounded, limited by stream pool
),
Some(MAX_STREAMS),
MAX_STREAM_QUEUE_DEPTH,
);
// Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
// to avoid head-of-line blocking of latency-sensitive requests.
let bulk_stream_pool = StreamPool::new(
ClientPool::new(
ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
tenant_id,
timeline_id,
shard_id,
auth_token,
None, // unbounded, limited by stream pool
),
Some(MAX_BULK_STREAMS),
MAX_BULK_STREAM_QUEUE_DEPTH,
);
Ok(Self {
_channel_pool: channel_pool,
client_pool,
stream_pool,
bulk_stream_pool,
})
}
@@ -336,8 +397,11 @@ impl Shard {
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard.
async fn stream(&self) -> StreamGuard {
self.stream_pool.get().await
/// Returns a pooled stream for this shard. If true, uses the bulk pool (e.g. for prefetches).
async fn stream(&self, bulk: bool) -> StreamGuard {
match bulk {
false => self.stream_pool.get().await,
true => self.bulk_stream_pool.get().await,
}
}
}

View File

@@ -31,6 +31,7 @@
//! TODO: observability.
use std::collections::{BTreeMap, HashMap};
use std::num::NonZero;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
@@ -47,21 +48,10 @@ use utils::shard::ShardIndex;
// TODO: tune these constants, and make them configurable.
/// Max number of concurrent clients per channel.
///
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
/// with only streams.
const CLIENTS_PER_CHANNEL: usize = 16;
/// Maximum number of concurrent clients per `ClientPool`.
const CLIENT_LIMIT: usize = 64;
/// Max number of pipelined requests per gRPC GetPage stream.
const STREAM_QUEUE_DEPTH: usize = 2;
/// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
/// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of
/// channels, and instead relies on `ClientPool` to limit the number of concurrent clients.
/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
/// The pool does not limit the number of channels, and instead relies on `ClientPool` or
/// `StreamPool` to limit the number of concurrent clients.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
///
@@ -71,6 +61,8 @@ const STREAM_QUEUE_DEPTH: usize = 2;
pub struct ChannelPool {
/// Pageserver endpoint to connect to.
endpoint: Endpoint,
/// Max number of clients per channel. Beyond this, a new channel will be created.
max_clients_per_channel: NonZero<usize>,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Channel ID generator.
@@ -88,13 +80,14 @@ struct ChannelEntry {
impl ChannelPool {
/// Creates a new channel pool for the given Pageserver endpoint.
pub fn new<E>(endpoint: E) -> anyhow::Result<Arc<Self>>
pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
where
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
Ok(Arc::new(Self {
endpoint: endpoint.try_into()?,
max_clients_per_channel,
channels: Mutex::default(),
next_channel_id: AtomicUsize::default(),
}))
@@ -122,8 +115,11 @@ impl ChannelPool {
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow");
if entry.clients < CLIENTS_PER_CHANNEL {
assert!(
entry.clients <= self.max_clients_per_channel.get(),
"channel overflow"
);
if entry.clients < self.max_clients_per_channel.get() {
entry.clients += 1;
return ChannelGuard {
pool: Arc::downgrade(self),
@@ -183,7 +179,7 @@ impl Drop for ChannelGuard {
/// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
/// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
/// number of concurrent clients to `CLIENT_LIMIT` via semaphore.
/// number of concurrent clients to `max_clients` via semaphore.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
///
@@ -199,8 +195,8 @@ pub struct ClientPool {
auth_token: Option<String>,
/// Channel pool to acquire channels from.
channel_pool: Arc<ChannelPool>,
/// Limits the max number of concurrent clients for this pool.
limiter: Arc<Semaphore>,
/// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Idle pooled clients. Acquired clients are removed from here and returned on drop.
///
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
@@ -223,13 +219,15 @@ struct ClientEntry {
impl ClientPool {
/// Creates a new client pool for the given tenant shard. Channels are acquired from the given
/// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard.
/// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
/// `max_clients` concurrent clients, or unbounded if None.
pub fn new(
channel_pool: Arc<ChannelPool>,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
max_clients: Option<NonZero<usize>>,
) -> Arc<Self> {
Arc::new(Self {
tenant_id,
@@ -238,25 +236,24 @@ impl ClientPool {
auth_token,
channel_pool,
idle: Mutex::default(),
limiter: Arc::new(Semaphore::new(CLIENT_LIMIT)),
limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
next_client_id: AtomicUsize::default(),
})
}
/// Gets a client from the pool, or creates a new one if necessary. Connections are established
/// lazily and does not block, but this call can block if the pool is at `CLIENT_LIMIT`. The
/// client is returned to the pool when the guard is dropped.
/// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
/// is returned to the pool when the guard is dropped.
///
/// This is moderately performance-sensitive. It is called for every unary request, but recall
/// that these establish a new gRPC stream per request so they're already expensive. GetPage
/// requests use the `StreamPool` instead.
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
let permit = self
.limiter
.clone()
.acquire_owned()
.await
.expect("never closed");
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
// Fast path: acquire an idle client from the pool.
if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
@@ -298,9 +295,9 @@ impl ClientPool {
pub struct ClientGuard {
pool: Weak<ClientPool>,
id: ClientID,
client: Option<page_api::Client>, // Some until dropped
channel_guard: Option<ChannelGuard>, // Some until dropped
permit: OwnedSemaphorePermit,
client: Option<page_api::Client>, // Some until dropped
channel_guard: Option<ChannelGuard>, // Some until dropped
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl Deref for ClientGuard {
@@ -343,16 +340,21 @@ impl Drop for ClientGuard {
/// TODO: reap idle streams.
/// TODO: consider making this generic over request and response types; not currently needed.
pub struct StreamPool {
/// The client pool to acquire clients from.
/// The client pool to acquire clients from. Must be unbounded.
client_pool: Arc<ClientPool>,
/// All pooled streams.
///
/// Incoming requests will be sent over an existing stream with available capacity. If all
/// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit).
/// Each stream has an associated Tokio task that processes requests and responses.
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
/// stream has an associated Tokio task that processes requests and responses.
streams: Arc<Mutex<HashMap<StreamID, StreamEntry>>>,
/// Limits the max number of concurrent requests (not streams).
limiter: Arc<Semaphore>,
/// The max number of concurrent streams, or None if unbounded.
max_streams: Option<NonZero<usize>>,
/// The max number of concurrent requests per stream.
max_queue_depth: NonZero<usize>,
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
/// None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Stream ID generator.
next_stream_id: AtomicUsize,
}
@@ -371,16 +373,27 @@ struct StreamEntry {
}
impl StreamPool {
/// Creates a new stream pool, using the given client pool.
/// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
/// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
///
/// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may
/// fill up the client pool and starve out unary requests. Client pools can share the same
/// `ChannelPool` though, since the channel pool is unbounded.
pub fn new(client_pool: Arc<ClientPool>) -> Arc<Self> {
/// The client pool must be unbounded. The stream pool will enforce its own limits, and because
/// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
/// The stream pool should generally have its own dedicated client pool (but it can share a
/// channel pool with others since these are always unbounded).
pub fn new(
client_pool: Arc<ClientPool>,
max_streams: Option<NonZero<usize>>,
max_queue_depth: NonZero<usize>,
) -> Arc<Self> {
assert!(client_pool.limiter.is_none(), "bounded client pool");
Arc::new(Self {
client_pool,
streams: Arc::default(),
limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)),
limiter: max_streams.map(|max_streams| {
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
}),
max_streams,
max_queue_depth,
next_stream_id: AtomicUsize::default(),
})
}
@@ -403,18 +416,17 @@ impl StreamPool {
///
/// For now, we just do something simple and functional, but very inefficient (linear scan).
pub async fn get(&self) -> StreamGuard {
let permit = self
.limiter
.clone()
.acquire_owned()
.await
.expect("never closed");
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
let mut streams = self.streams.lock().unwrap();
// Look for a pooled stream with available capacity.
for entry in streams.values() {
assert!(
entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH,
entry.queue_depth.load(Ordering::Relaxed) <= self.max_queue_depth.get(),
"stream queue overflow"
);
if entry
@@ -422,7 +434,7 @@ impl StreamPool {
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| {
// Increment the queue depth via compare-and-swap.
// TODO: review ordering.
(queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1)
(queue_depth < self.max_queue_depth.get()).then_some(queue_depth + 1)
})
.is_ok()
{
@@ -437,25 +449,18 @@ impl StreamPool {
// No available stream, spin up a new one. We install the stream entry in the pool first and
// return the guard, while spinning up the stream task async. This allows other callers to
// join onto this stream and also create additional streams concurrently if this fills up.
//
// NB: we have to be careful not to overshoot here. The semaphore limit is CLIENT_LIMIT *
// STREAM_QUEUE_DEPTH, but if we were to miss a concurrent queue depth allocation we'd try
// to spin up more streams than CLIENT_LIMIT and block on the client pool ~forever. Because
// we only acquire queue depth under lock.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller
let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH);
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: queue_depth.clone(),
};
streams.insert(id, entry);
// NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT *
// STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more
// streams than CLIENT_LIMIT and block on the client pool ~forever. This should not be
// possible because we only acquire queue depth under lock.
assert!(streams.len() <= CLIENT_LIMIT, "stream overflow");
if let Some(max_streams) = self.max_streams {
assert!(streams.len() <= max_streams.get(), "stream overflow");
};
let client_pool = self.client_pool.clone();
let streams = self.streams.clone();
@@ -490,19 +495,16 @@ impl StreamPool {
// Acquire a client from the pool and create a stream.
let mut client = client_pool.get().await?;
let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH);
let (req_tx, req_rx) = mpsc::channel(1);
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH);
let mut callers = HashMap::new();
// Process requests and responses.
loop {
// NB: this can trip if the server doesn't respond to a request, so only debug_assert.
debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow");
tokio::select! {
// Receive requests from callers and send them to the stream.
req = caller_rx.recv() => {
@@ -551,7 +553,7 @@ impl StreamPool {
pub struct StreamGuard {
sender: RequestSender,
queue_depth: Arc<AtomicUsize>,
permit: OwnedSemaphorePermit,
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl StreamGuard {

View File

@@ -386,7 +386,7 @@ impl From<GetPageRequest> for proto::GetPageRequest {
pub type RequestID = u64;
/// A GetPage request class.
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, strum_macros::Display)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.