Compare commits

...

12 Commits

Author SHA1 Message Date
Elizabeth Murray
2ff2eb6a9e Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-22 14:45:44 -07:00
Elizabeth Murray
bdfc6d3ef9 Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-20 16:32:26 -07:00
Elizabeth Murray
f47e90fd42 Add version strings to new modules. 2025-06-20 16:15:13 -07:00
Elizabeth Murray
9cc79672f3 Fix cargo deny check error, to see if this fixes Cargo.lock CI issue. 2025-06-20 15:00:09 -07:00
Elizabeth Murray
4a9b1ad5cb Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-20 11:59:58 -07:00
Elizabeth Murray
dc4238896a Regenerate Cargo.lock as it is producing Dockerfile errors. 2025-06-20 11:45:41 -07:00
Elizabeth Murray
e1fa844da4 Clippy updates, add Cargo.lock. 2025-06-20 10:18:17 -07:00
Elizabeth Murray
c8a2612207 Add workspace license and version. 2025-06-20 09:48:10 -07:00
Elizabeth Murray
b6e89a3af8 Run clippy. 2025-06-20 09:42:11 -07:00
Elizabeth Murray
261a9ae093 Run cargo fmt. 2025-06-20 08:28:40 -07:00
Elizabeth Murray
cac4ee8ea3 Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-20 08:27:57 -07:00
Elizabeth Murray
7636c4085a Add initial skeleton for client cache code, and request tracker. 2025-06-19 14:50:23 -07:00
6 changed files with 192 additions and 2 deletions

41
Cargo.lock generated
View File

@@ -1235,6 +1235,25 @@ dependencies = [
"replace_with",
]
[[package]]
name = "client_cache"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"futures",
"http 1.1.0",
"hyper-util",
"priority-queue",
"rand 0.8.5",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tower 0.5.2",
"uuid",
"workspace_hack",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
@@ -5029,6 +5048,17 @@ dependencies = [
"elliptic-curve 0.13.8",
]
[[package]]
name = "priority-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5676d703dda103cbb035b653a9f11448c0a7216c7926bd35fcb5865475d0c970"
dependencies = [
"autocfg",
"equivalent",
"indexmap 2.9.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.94"
@@ -5647,9 +5677,16 @@ dependencies = [
[[package]]
name = "replace_with"
version = "0.1.7"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690"
checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884"
[[package]]
name = "request_tracker"
version = "0.1.0"
dependencies = [
"workspace_hack",
]
[[package]]
name = "reqwest"

View File

@@ -8,8 +8,10 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/communicator_pools/client_cache",
"pageserver/pagebench",
"pageserver/page_api",
"pageserver/communicator_pools/request_tracker",
"proxy",
"safekeeper",
"safekeeper/client",
@@ -257,6 +259,8 @@ pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
client_cache = { path = "./pageserver/communicator_pools/client_cache" }
request_tracker = { path = "./pageserver/communicator_pools/request_tracker" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

View File

@@ -0,0 +1,21 @@
[package]
name = "client_cache"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
bytes.workspace = true
futures.workspace = true
hyper-util.workspace = true
http.workspace = true
priority-queue = "2.3.1"
rand.workspace = true
tonic.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tower.workspace = true
uuid.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,105 @@
use async_trait::async_trait;
use priority_queue::PriorityQueue;
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
#[async_trait]
pub trait PooledClientFactory<T>: Send + Sync + 'static {
/// Create a new pooled item.
async fn create(
&self,
connect_timeout: Duration,
) -> Result<Result<T, tonic::Status>, tokio::time::error::Elapsed>;
}
/// A pooled gRPC client with capacity tracking and error handling.
#[allow(dead_code)]
pub struct ClientCache<T> {
inner: Mutex<Inner<T>>,
fact: Arc<dyn PooledClientFactory<T> + Send + Sync>,
connect_timeout: Duration,
connect_backoff: Duration,
/// The maximum number of consumers that can use a single connection.
max_consumers: usize,
/// The number of consecutive errors before a connection is removed from the pool.
error_threshold: usize,
/// The maximum duration a connection can be idle before being removed.
max_idle_duration: Duration,
max_total_connections: usize,
client_semaphore: Arc<Semaphore>,
}
#[allow(dead_code)]
struct Inner<T> {
entries: HashMap<uuid::Uuid, CacheEntry<T>>,
pq: PriorityQueue<uuid::Uuid, usize>,
// This is updated when a connection is dropped, or we fail
// to create a new connection.
last_connect_failure: Option<Instant>,
waiters: usize,
in_progress: usize,
}
#[allow(dead_code)]
struct CacheEntry<T> {
client: T,
active_consumers: usize,
consecutive_errors: usize,
last_used: Instant,
}
/// A client borrowed from the pool.
#[allow(dead_code)]
pub struct PooledClient<T> {
pub client: T,
pool: Arc<ClientCache<T>>,
is_ok: bool,
id: uuid::Uuid,
permit: OwnedSemaphorePermit,
}
impl<T: Clone + Send + 'static> ClientCache<T> {
pub fn new(
fact: Arc<dyn PooledClientFactory<T> + Send + Sync>,
connect_timeout: Duration,
connect_backoff: Duration,
max_consumers: usize,
error_threshold: usize,
max_idle_duration: Duration,
max_total_connections: usize,
) -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(Inner::<T> {
entries: HashMap::new(),
pq: PriorityQueue::new(),
last_connect_failure: None,
waiters: 0,
in_progress: 0,
}),
fact: Arc::clone(&fact),
connect_timeout,
connect_backoff,
max_consumers,
error_threshold,
max_idle_duration,
max_total_connections,
client_semaphore: Arc::new(Semaphore::new(0)),
})
}
}
impl<T: Clone + Send + 'static> PooledClient<T> {
pub fn client(&self) -> T {
self.client.clone()
}
}

View File

@@ -0,0 +1,8 @@
[package]
name = "request_tracker"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
workspace_hack.workspace = true

View File

@@ -0,0 +1,15 @@
// Temporary placeholder until the request tracker is implemented
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}