Add a generic pool prototype

This commit is contained in:
Erik Grinaker
2025-06-30 14:49:34 +02:00
parent a5b0fc560c
commit 81ac4ef43a
4 changed files with 104 additions and 4 deletions

1
Cargo.lock generated
View File

@@ -4595,6 +4595,7 @@ dependencies = [
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"chrono",

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
http.workspace = true

View File

@@ -4,6 +4,7 @@
//! - Send requests to correct shards
//!
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
@@ -12,17 +13,16 @@ use bytes::Bytes;
use futures::{Stream, StreamExt};
use thiserror::Error;
use tonic::metadata::AsciiMetadataValue;
use tonic::transport::Channel;
use pageserver_page_api::proto;
use pageserver_page_api::*;
use pageserver_page_api::proto::PageServiceClient;
use pageserver_page_api::*;
use utils::shard::ShardIndex;
use std::fmt::Debug;
pub mod client_cache;
pub mod pool;
pub mod request_tracker;
use tonic::transport::Channel;
use metrics::{IntCounterVec, core::Collector};

View File

@@ -0,0 +1,98 @@
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use std::future::Future;
use std::pin::Pin;
use tokio::sync::{Semaphore, SemaphorePermit};
/// Constructs new pool items.
/// TODO: use a proper error type.
type Maker<T> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<T>>>> + Send + Sync>;
/// A resource pool. This is used to manage gRPC channels, clients, and stream.
///
/// An item is only handed out to a single user at a time. New items will be created up to the pool
/// limit, if specified.
pub struct Pool<T: PooledItem> {
/// Creates new pool items.
maker: Maker<T>,
/// Idle items in the pool. Returned items are pushed to the front of the queue, so that the
/// oldest idle items are kept at the back.
///
/// TODO: reap idle items after some time.
/// TODO: consider prewarming items.
idle: Arc<Mutex<VecDeque<T>>>,
/// Limits the max number of items managed by the pool.
limiter: Semaphore,
}
impl<T: PooledItem> Pool<T> {
/// Create a new pool with the specified limit.
pub fn new(maker: Maker<T>, limit: Option<usize>) -> Self {
Self {
maker,
idle: Default::default(),
limiter: Semaphore::new(limit.unwrap_or(Semaphore::MAX_PERMITS)),
}
}
/// Gets an item from the pool, or creates a new one if necessary. Blocks if the pool is at its
/// limit. The item is returned to the pool when the guard is dropped.
pub async fn get(&mut self) -> anyhow::Result<PoolGuard<T>> {
let permit = self.limiter.acquire().await.expect("never closed");
// Acquire an idle item from the pool, or create a new one.
let item = self.idle.lock().unwrap().pop_front();
let item = match item {
Some(item) => item,
// TODO: if an item is returned while we're waiting, use the returned item instead.
None => (self.maker)().await?,
};
Ok(PoolGuard {
pool: self,
permit,
item: Some(item),
})
}
}
/// A guard for a pooled item.
pub struct PoolGuard<'a, T: PooledItem> {
pool: &'a Pool<T>,
permit: SemaphorePermit<'a>,
item: Option<T>, // only None during drop
}
impl<T: PooledItem> Deref for PoolGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.item.as_ref().expect("not dropped")
}
}
impl<T: PooledItem> DerefMut for PoolGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.item.as_mut().expect("not dropped")
}
}
impl<T: PooledItem> Drop for PoolGuard<'_, T> {
fn drop(&mut self) {
// Return the item to the pool.
self.pool
.idle
.lock()
.unwrap()
.push_front(self.item.take().expect("only dropped once"));
// The permit will be returned by its drop handler. Tag it here for visibility.
_ = self.permit;
}
}
/// A pooled item.
///
/// TODO: do we even need this?
pub trait PooledItem {}