From 48be1da6efc8cfdb38425ac51e8ba8c97b307e27 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 30 Jun 2025 21:45:55 +0200 Subject: [PATCH] Add initial client pool --- pageserver/client_grpc/src/pool.rs | 125 ++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 2 deletions(-) diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 0009038891..38d387bc42 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -1,12 +1,17 @@ use std::collections::{BTreeMap, VecDeque}; +use std::future::Future; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::future::Future; -use std::pin::Pin; use tokio::sync::{Semaphore, SemaphorePermit}; use tonic::transport::{Channel, Endpoint}; +use pageserver_page_api as page_api; +use utils::id::{TenantId, TimelineId}; +use utils::shard::ShardIndex; + /// Constructs new pool items. /// TODO: use a proper error type. type Maker = Box Pin>>> + Send + Sync>; @@ -201,3 +206,119 @@ impl Drop for ChannelGuard<'_> { entry.clients -= 1; } } + +/// A pool of gRPC clients. +pub struct ClientPool<'a> { + /// Tenant ID. + tenant_id: TenantId, + /// Timeline ID. + timeline_id: TimelineId, + /// Shard ID. + shard_id: ShardIndex, + /// Authentication token, if any. + auth_token: Option, + /// Channel pool. + channels: ChannelPool, + /// Limits the max number of concurrent clients. + limiter: Semaphore, + /// Idle clients in the pool. + idle: Arc>>>, + /// Unique client ID generator. + next_client_id: AtomicUsize, +} + +type ClientID = usize; +type ClientKey = (ChannelID, ClientID); +struct ClientEntry<'a> { + client: page_api::Client, + channel_guard: ChannelGuard<'a>, +} + +impl<'a> ClientPool<'a> { + const CLIENT_LIMIT: usize = 100; // TODO: make this configurable + + /// Creates a new client pool for the given Pageserver and tenant shard. + pub fn new( + url: String, + tenant_id: TenantId, + timeline_id: TimelineId, + shard_id: ShardIndex, + auth_token: Option, + ) -> anyhow::Result { + Ok(Self { + tenant_id, + timeline_id, + shard_id, + auth_token, + channels: ChannelPool::new(url)?, + idle: Arc::default(), + limiter: Semaphore::new(Self::CLIENT_LIMIT), + next_client_id: AtomicUsize::default(), + }) + } + + /// Gets a client from the pool, or creates a new one if necessary. The client is returned to + /// the pool when the guard is dropped. + pub async fn get(&'a self) -> anyhow::Result> { + let permit = self.limiter.acquire().await.expect("never closed"); + let mut idle = self.idle.lock().unwrap(); + + // Fast path: acquire an idle client from the pool. + if let Some(((_, id), entry)) = idle.pop_first() { + return Ok(ClientGuard { + pool: self, + id, + client: Some(entry.client), + channel_guard: Some(entry.channel_guard), + permit, + }); + } + + // Slow path: construct a new client. + let mut channel_guard = self.channels.get()?; // never blocks (lazy connection) + let id = self.next_client_id.fetch_add(1, Ordering::Relaxed); + + let client = page_api::Client::new( + channel_guard.take(), + self.tenant_id, + self.timeline_id, + self.shard_id, + self.auth_token.clone(), + None, + )?; + + Ok(ClientGuard { + pool: self, + id, + client: Some(client), + channel_guard: Some(channel_guard), + permit, + }) + } +} + +pub struct ClientGuard<'a> { + pool: &'a ClientPool<'a>, + id: ClientID, + client: Option, + channel_guard: Option>, + permit: SemaphorePermit<'a>, +} + +// Returns the client to the pool. +impl Drop for ClientGuard<'_> { + fn drop(&mut self) { + let mut idle = self.pool.idle.lock().unwrap(); + let client = self.client.take().expect("dropped once"); + let channel_guard = self.channel_guard.take().expect("dropped once"); + let channel_id = channel_guard.id; + let entry = ClientEntry { + client, + channel_guard, + }; + idle.insert((channel_id, self.id), entry); + + // The permit will be returned by its drop handler. Tag it here for visibility. + _ = self.permit; + } +}