Add initial client pool

This commit is contained in:
Erik Grinaker
2025-06-30 21:45:55 +02:00
parent d2efc80e40
commit 48be1da6ef

View File

@@ -1,12 +1,17 @@
use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::future::Future;
use std::pin::Pin;
use tokio::sync::{Semaphore, SemaphorePermit};
use tonic::transport::{Channel, Endpoint};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
/// Constructs new pool items.
/// TODO: use a proper error type.
type Maker<T> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<T>>>> + Send + Sync>;
@@ -201,3 +206,119 @@ impl Drop for ChannelGuard<'_> {
entry.clients -= 1;
}
}
/// A pool of gRPC clients.
pub struct ClientPool<'a> {
/// Tenant ID.
tenant_id: TenantId,
/// Timeline ID.
timeline_id: TimelineId,
/// Shard ID.
shard_id: ShardIndex,
/// Authentication token, if any.
auth_token: Option<String>,
/// Channel pool.
channels: ChannelPool,
/// Limits the max number of concurrent clients.
limiter: Semaphore,
/// Idle clients in the pool.
idle: Arc<Mutex<BTreeMap<ClientKey, ClientEntry<'a>>>>,
/// Unique client ID generator.
next_client_id: AtomicUsize,
}
type ClientID = usize;
type ClientKey = (ChannelID, ClientID);
struct ClientEntry<'a> {
client: page_api::Client,
channel_guard: ChannelGuard<'a>,
}
impl<'a> ClientPool<'a> {
const CLIENT_LIMIT: usize = 100; // TODO: make this configurable
/// Creates a new client pool for the given Pageserver and tenant shard.
pub fn new(
url: String,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
Ok(Self {
tenant_id,
timeline_id,
shard_id,
auth_token,
channels: ChannelPool::new(url)?,
idle: Arc::default(),
limiter: Semaphore::new(Self::CLIENT_LIMIT),
next_client_id: AtomicUsize::default(),
})
}
/// Gets a client from the pool, or creates a new one if necessary. The client is returned to
/// the pool when the guard is dropped.
pub async fn get(&'a self) -> anyhow::Result<ClientGuard<'a>> {
let permit = self.limiter.acquire().await.expect("never closed");
let mut idle = self.idle.lock().unwrap();
// Fast path: acquire an idle client from the pool.
if let Some(((_, id), entry)) = idle.pop_first() {
return Ok(ClientGuard {
pool: self,
id,
client: Some(entry.client),
channel_guard: Some(entry.channel_guard),
permit,
});
}
// Slow path: construct a new client.
let mut channel_guard = self.channels.get()?; // never blocks (lazy connection)
let id = self.next_client_id.fetch_add(1, Ordering::Relaxed);
let client = page_api::Client::new(
channel_guard.take(),
self.tenant_id,
self.timeline_id,
self.shard_id,
self.auth_token.clone(),
None,
)?;
Ok(ClientGuard {
pool: self,
id,
client: Some(client),
channel_guard: Some(channel_guard),
permit,
})
}
}
pub struct ClientGuard<'a> {
pool: &'a ClientPool<'a>,
id: ClientID,
client: Option<page_api::Client>,
channel_guard: Option<ChannelGuard<'a>>,
permit: SemaphorePermit<'a>,
}
// Returns the client to the pool.
impl Drop for ClientGuard<'_> {
fn drop(&mut self) {
let mut idle = self.pool.idle.lock().unwrap();
let client = self.client.take().expect("dropped once");
let channel_guard = self.channel_guard.take().expect("dropped once");
let channel_id = channel_guard.id;
let entry = ClientEntry {
client,
channel_guard,
};
idle.insert((channel_id, self.id), entry);
// The permit will be returned by its drop handler. Tag it here for visibility.
_ = self.permit;
}
}