diff --git a/Cargo.lock b/Cargo.lock index 9e8e789a84..20d49547bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4595,6 +4595,7 @@ dependencies = [ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "bytes", "chrono", diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index fdd838c098..ae4ed72052 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +anyhow.workspace = true bytes.workspace = true futures.workspace = true http.workspace = true diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 7276a27215..ea0d85b92e 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -4,6 +4,7 @@ //! - Send requests to correct shards //! use std::collections::HashMap; +use std::fmt::Debug; use std::sync::Arc; use std::sync::RwLock; use std::time::Duration; @@ -12,17 +13,16 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use thiserror::Error; use tonic::metadata::AsciiMetadataValue; +use tonic::transport::Channel; use pageserver_page_api::proto; -use pageserver_page_api::*; - use pageserver_page_api::proto::PageServiceClient; +use pageserver_page_api::*; use utils::shard::ShardIndex; -use std::fmt::Debug; pub mod client_cache; +pub mod pool; pub mod request_tracker; -use tonic::transport::Channel; use metrics::{IntCounterVec, core::Collector}; diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs new file mode 100644 index 0000000000..30c30ca300 --- /dev/null +++ b/pageserver/client_grpc/src/pool.rs @@ -0,0 +1,98 @@ +use std::collections::VecDeque; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex}; + +use std::future::Future; +use std::pin::Pin; +use tokio::sync::{Semaphore, SemaphorePermit}; + +/// Constructs new pool items. +/// TODO: use a proper error type. +type Maker = Box Pin>>> + Send + Sync>; + +/// A resource pool. This is used to manage gRPC channels, clients, and stream. +/// +/// An item is only handed out to a single user at a time. New items will be created up to the pool +/// limit, if specified. +pub struct Pool { + /// Creates new pool items. + maker: Maker, + /// Idle items in the pool. Returned items are pushed to the front of the queue, so that the + /// oldest idle items are kept at the back. + /// + /// TODO: reap idle items after some time. + /// TODO: consider prewarming items. + idle: Arc>>, + /// Limits the max number of items managed by the pool. + limiter: Semaphore, +} + +impl Pool { + /// Create a new pool with the specified limit. + pub fn new(maker: Maker, limit: Option) -> Self { + Self { + maker, + idle: Default::default(), + limiter: Semaphore::new(limit.unwrap_or(Semaphore::MAX_PERMITS)), + } + } + + /// Gets an item from the pool, or creates a new one if necessary. Blocks if the pool is at its + /// limit. The item is returned to the pool when the guard is dropped. + pub async fn get(&mut self) -> anyhow::Result> { + let permit = self.limiter.acquire().await.expect("never closed"); + + // Acquire an idle item from the pool, or create a new one. + let item = self.idle.lock().unwrap().pop_front(); + let item = match item { + Some(item) => item, + // TODO: if an item is returned while we're waiting, use the returned item instead. + None => (self.maker)().await?, + }; + + Ok(PoolGuard { + pool: self, + permit, + item: Some(item), + }) + } +} + +/// A guard for a pooled item. +pub struct PoolGuard<'a, T: PooledItem> { + pool: &'a Pool, + permit: SemaphorePermit<'a>, + item: Option, // only None during drop +} + +impl Deref for PoolGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.item.as_ref().expect("not dropped") + } +} + +impl DerefMut for PoolGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.item.as_mut().expect("not dropped") + } +} + +impl Drop for PoolGuard<'_, T> { + fn drop(&mut self) { + // Return the item to the pool. + self.pool + .idle + .lock() + .unwrap() + .push_front(self.item.take().expect("only dropped once")); + // The permit will be returned by its drop handler. Tag it here for visibility. + _ = self.permit; + } +} + +/// A pooled item. +/// +/// TODO: do we even need this? +pub trait PooledItem {}