diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index bc84e14333..3955ef579f 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -18,6 +18,7 @@ tokio-util = { version = "0.7", features = ["compat"] } hyper-util = "0.1.9" hyper = "1.6.0" metrics.workspace = true +priority-queue = "2.3.1" pageserver_page_api.workspace = true diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index c3046f1c85..bb8befa1ce 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -1,11 +1,15 @@ use std::{ - collections::HashMap, + collections::{BinaryHeap, HashMap}, sync::{ Arc, atomic::{AtomicUsize, Ordering}, }, time::{Duration, Instant}, + io::{self, Error, ErrorKind}, }; + +use priority_queue::PriorityQueue; + use tokio::{ sync::{Mutex, mpsc, watch, Semaphore, OwnedSemaphorePermit}, time::sleep, @@ -15,7 +19,6 @@ use tokio::{ use tonic::transport::{Channel, Endpoint}; use uuid; -use std::io::{self, Error, ErrorKind}; use std::{ pin::Pin, @@ -47,63 +50,22 @@ use tokio::net::TcpStream; use tower::service_fn; use uuid; -use metrics; -use metrics::proto::MetricFamily; -use metrics::{Encoder, TextEncoder}; +use metrics::{ + {Encoder, TextEncoder}, + proto::MetricFamily, +}; // use info use tracing::info; use tokio_util::sync::CancellationToken; -/// A pooled gRPC client with capacity tracking and error handling. -pub struct ConnectionPool { - inner: Mutex, - // Config options that apply to each connection - endpoint: String, - max_consumers: usize, - error_threshold: usize, - connect_timeout: Duration, - connect_backoff: Duration, - // add max_delay - // The maximum time a connection can be idle before being removed - max_delay_ms: u64, - drop_rate: f64, - hang_rate: f64, - // The maximum duration a connection can be idle before being removed - max_idle_duration: Duration, - channel_semaphore: Arc, +// +// The "TokioTcp" is flakey TCP network for testing purposes, in order +// to simulate network errors and delays. +// - shutdown_token: CancellationToken, - aggregate_metrics: Option>, -} - -struct Inner { - entries: HashMap, - - // This is updated when a connection is dropped, or we fail - // to create a new connection. - last_connect_failure: Option, - waiters: usize, - in_progress: usize, -} - -struct ConnectionEntry { - channel: Channel, - active_consumers: usize, - consecutive_successes: usize, - consecutive_errors: usize, - last_used: Instant, -} - -/// A client borrowed from the pool. -pub struct PooledClient { - pub channel: Channel, - pool: Arc, - id: uuid::Uuid, - permit: OwnedSemaphorePermit, -} /// Wraps a `TcpStream`, buffers incoming data, and injects a random delay per fresh read/write. pub struct TokioTcp { tcp: TcpStream, @@ -231,6 +193,56 @@ impl AsyncWrite for TokioTcp { } } +/// A pooled gRPC client with capacity tracking and error handling. +pub struct ConnectionPool { + inner: Mutex, + + // Config options that apply to each connection + endpoint: String, + max_consumers: usize, + error_threshold: usize, + connect_timeout: Duration, + connect_backoff: Duration, + + // Parameters for testing + max_delay_ms: u64, + drop_rate: f64, + hang_rate: f64, + + // The maximum duration a connection can be idle before being removed + max_idle_duration: Duration, + channel_semaphore: Arc, + + shutdown_token: CancellationToken, + aggregate_metrics: Option>, +} + +struct Inner { + entries: HashMap, + pq: PriorityQueue, + // This is updated when a connection is dropped, or we fail + // to create a new connection. + last_connect_failure: Option, + waiters: usize, + in_progress: usize, +} + +struct ConnectionEntry { + channel: Channel, + active_consumers: usize, + consecutive_errors: usize, + last_used: Instant, +} + +/// A client borrowed from the pool. +pub struct PooledClient { + pub channel: Channel, + pool: Arc, + id: uuid::Uuid, + permit: OwnedSemaphorePermit, + is_ok: bool, +} + impl ConnectionPool { pub fn new( endpoint: &String, @@ -249,6 +261,7 @@ impl ConnectionPool { let pool = Arc::new(Self { inner: Mutex::new(Inner { entries: HashMap::new(), + pq: PriorityQueue::new(), last_connect_failure: None, waiters: 0, in_progress: 0, @@ -306,100 +319,153 @@ impl ConnectionPool { /// Sweep and remove idle connections safely, burning their permits. async fn sweep_idle_connections(self: &Arc) { - let mut to_forget = Vec::new(); + let mut ids_to_remove = Vec::new(); let now = Instant::now(); - // Remove idle entries and collect permits to forget + // Remove idle entries. First collect permits for those connections so that + // no consumer will reserve them, then remove them from the pool. { let mut inner = self.inner.lock().await; - inner.entries.retain(|_, entry| { + inner.entries.retain(|id, entry| { if entry.active_consumers == 0 && now.duration_since(entry.last_used) > self.max_idle_duration { - let semaphore = Arc::clone(&self.channel_semaphore); - if let Ok(permits) = semaphore.try_acquire_many_owned(self.max_consumers as u32) { - to_forget.push(permits); - return false; // remove this entry + // metric + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["connection_swept"]).inc(); + } + None => {} } + ids_to_remove.push(*id); + return false; // remove this entry } true }); - } - - // Permanently consume those permits - for permit in to_forget { - permit.forget(); + // Remove the entries from the priority queue + for id in ids_to_remove { + inner.pq.remove(&id); + } } } - // If we have a permit already, get a connection out of the hash table - async fn get_conn_with_permit(self: Arc, permit: OwnedSemaphorePermit) -> PooledClient { + // If we have a permit already, get a connection out of the heap + async fn get_conn_with_permit(self: Arc, permit: OwnedSemaphorePermit) + -> Option { let mut inner = self.inner.lock().await; - // TODO: Use a heap, although the number of connections is small - if let Some((&id, entry)) = inner - .entries - .iter_mut() - .filter(|(_, e)| e.active_consumers < self.max_consumers) - .filter(|(_, e)| e.consecutive_errors < self.error_threshold) - .max_by_key(|(_, e)| e.active_consumers) - { + + // Pop the highest-active-consumers connection. There are no connections + // in the heap that have more than max_consumers active consumers. + if let Some((id, cons)) = inner.pq.pop() { + let entry = inner.entries.get_mut(&id) + .expect("pq and entries got out of sync"); + + let mut active_consumers = entry.active_consumers; entry.active_consumers += 1; - let client = PooledClient { + entry.last_used = Instant::now(); + + let client = PooledClient { channel: entry.channel.clone(), pool: Arc::clone(&self), id, - permit, + permit: permit, + is_ok: true, }; - return client; + + // re‐insert with updated priority + active_consumers += 1; + if active_consumers < self.max_consumers { + inner.pq.push(id, active_consumers as usize); + } + return Some(client); } else { - panic!("Corrupt state: no available connections with permit acquired."); + // If there is no connection to take, it is because permits for a connection + // need to drain. This can happen if a connection is removed because it has + // too many errors. It is taken out of the heap/hash table in this case, but + // we can't remove it's permits until now. + // + // Just forget the permit and retry. + permit.forget(); + return None; } } pub async fn get_client(self: Arc) -> Result { + // The pool is shutting down. Don't accept new connections. if self.shutdown_token.is_cancelled() { return Err(tonic::Status::unavailable("Pool is shutting down")); } - // Try to get the semaphore. If it fails, we are out of connections, so - // request that a new connection be created. - let mut semaphore = Arc::clone(&self.channel_semaphore); - match semaphore.try_acquire_owned() { - Ok(permit_) => { - let pool_conn = self.get_conn_with_permit(permit_).await; - return Ok(pool_conn); - } - Err(_) => { + // A loop is necessary because when a connection is draining, we have to return + // a permit and retry. + loop { + let self_clone = Arc::clone(&self); + let mut semaphore = Arc::clone(&self_clone.channel_semaphore); - match self.aggregate_metrics { - Some(ref metrics) => { - metrics.retry_counters.with_label_values(&["sema_acquire_failed"]).inc(); - } - None => {} - } - - { - let mut inner = self.inner.lock().await; - inner.waiters += 1; - if inner.waiters > (inner.in_progress * self.max_consumers) { - let self_clone = Arc::clone(&self); - tokio::task::spawn(async move { - self_clone.create_connection().await; - }); - inner.in_progress += 1; + match semaphore.try_acquire_owned() { + Ok(permit_) => { + // We got a permit, so check the heap for a connection + // we can use. + let pool_conn = self_clone.get_conn_with_permit(permit_).await; + match pool_conn { + Some(pool_conn_) => { + return Ok(pool_conn_); + } + None => { + // No connection available. Forget the permit and retry. + continue; + } } } - // Wait for a connection to become available, either because it - // was created or because a connection was returned to the pool. - semaphore = Arc::clone(&self.channel_semaphore); - let conn_permit = semaphore.acquire_owned().await.unwrap(); - { - let mut inner = self.inner.lock().await; - inner.waiters -= 1; + Err(_) => { + + match self_clone.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["sema_acquire_failed"]).inc(); + } + None => {} + } + + { + // + // This is going to generate enough connections to handle a burst, + // but it may generate up to twice the number of connections needed + // in the worst case. Extra connections will go idle and be cleaned + // up. + // + let mut inner = self_clone.inner.lock().await; + inner.waiters += 1; + if inner.waiters >= (inner.in_progress * self_clone.max_consumers) { + semaphore = Arc::clone(&self_clone.channel_semaphore); + let self_clone_spawn = Arc::clone(&self_clone); + tokio::task::spawn(async move { + self_clone_spawn.create_connection().await; + }); + inner.in_progress += 1; + } + } + // Wait for a connection to become available, either because it + // was created or because a connection was returned to the pool + // by another consumer. + semaphore = Arc::clone(&self_clone.channel_semaphore); + let conn_permit = semaphore.acquire_owned().await.unwrap(); + { + let mut inner = self_clone.inner.lock().await; + inner.waiters -= 1; + } + // We got a permit, check the heap for a connection. + let pool_conn = self_clone.get_conn_with_permit(conn_permit).await; + match pool_conn { + Some(pool_conn_) => { + return Ok(pool_conn_); + } + None => { + // No connection was found, forget the permit and retry. + continue; + } + } } - let pool_conn = self.get_conn_with_permit(conn_permit).await; - return Ok(pool_conn); } } } @@ -446,11 +512,19 @@ impl ConnectionPool { } }); - let mut backoff_delay = self.connect_backoff; + // Generate a random backoff to add some jitter so that connections + // don't all retry at the same time. + let mut backoff_delay = Duration::from_millis( + rand::thread_rng().gen_range(0..=self.connect_backoff.as_millis() as u64)); + loop { + if self.shutdown_token.is_cancelled() { return; } + // Back off. // Loop because failure can occur while we are sleeping, so wait - // until the failure stopped for at least one backoff period. + // until the failure stopped for at least one backoff period. Backoff + // period includes some jitter, so that if multiple connections are + // failing, they don't all retry at the same time. loop { if let Some(delay) = { let inner = self.inner.lock().await; @@ -472,7 +546,6 @@ impl ConnectionPool { // on this connection. (Requests made later on this channel will time out // with the same timeout.) // - match self.aggregate_metrics { Some(ref metrics) => { metrics.retry_counters.with_label_values(&["connection_attempt"]).inc(); @@ -490,6 +563,7 @@ impl ConnectionPool { .await; match attempt { + // Connection succeeded Ok(Ok(channel)) => { { match self.aggregate_metrics { @@ -505,17 +579,17 @@ impl ConnectionPool { ConnectionEntry { channel: channel.clone(), active_consumers: 0, - consecutive_successes: 0, consecutive_errors: 0, last_used: Instant::now(), }, ); - self.channel_semaphore.add_permits(self.max_consumers); - // decrement in progress connections + inner.pq.push(id, 0); inner.in_progress -= 1; + self.channel_semaphore.add_permits(self.max_consumers); return; }; } + // Connection failed, back off and retry Ok(Err(_)) | Err(_) => { match self.aggregate_metrics { Some(ref metrics) => { @@ -529,10 +603,11 @@ impl ConnectionPool { let jitter = rand::thread_rng().gen_range(0..=backoff_delay.as_millis() as u64); backoff_delay = Duration::from_millis(backoff_delay.as_millis() as u64 + jitter); - // Do not delay longer than one minute + // Do not backoff longer than one minute if backoff_delay > Duration::from_secs(60) { backoff_delay = Duration::from_secs(60); } + // continue the loop to retry } } } @@ -540,39 +615,60 @@ impl ConnectionPool { /// Return client to the pool, indicating success or error. - pub async fn return_client(&self, id: uuid::Uuid, success: bool) { + pub async fn return_client(&self, id: uuid::Uuid, success: bool, permit: OwnedSemaphorePermit) { let mut inner = self.inner.lock().await; - let mut new_failure = false; if let Some(entry) = inner.entries.get_mut(&id) { entry.last_used = Instant::now(); if entry.active_consumers <= 0 { panic!("A consumer completed when active_consumers was zero!") } entry.active_consumers = entry.active_consumers - 1; - if entry.consecutive_errors < self.error_threshold { - if success { - entry.consecutive_successes += 1; + if success { + if entry.consecutive_errors < self.error_threshold { entry.consecutive_errors = 0; - } else { - entry.consecutive_errors += 1; - entry.consecutive_successes = 0; - if entry.consecutive_errors == self.error_threshold { - new_failure = true; + } + } else { + entry.consecutive_errors += 1; + if entry.consecutive_errors == self.error_threshold { + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["connection_dropped"]).inc(); + } + None => {} } } } + // // Too many errors on this connection. If there are no active users, // remove it. Otherwise just wait for active_consumers to go to zero. // This connection will not be selected for new consumers. // - if entry.consecutive_errors == self.error_threshold { - let remove = entry.active_consumers; - if new_failure { - inner.last_connect_failure = Some(Instant::now()); + let active_consumers = entry.active_consumers; + if entry.consecutive_errors >= self.error_threshold { + // too many errors, remove the connection permanently. Once it drains, + // it will be dropped. + if inner.pq.get_priority(&id).is_some() { + inner.pq.remove(&id); } - if remove == 0 { - inner.entries.remove(&id); + + inner.last_connect_failure = Some(Instant::now()); + + // The connection has been removed, it's permits will be + // drained because if we look for a connection and it's not there + // we just forget the permit. However, this process can be a little + // bit faster if we just forget permits as the connections are returned. + permit.forget(); + } else { + // update its priority in the queue + if inner.pq.get_priority(&id).is_some() { + inner.pq.change_priority(&id, active_consumers); + } else { + // This connection is not in the heap, but it has space + // for more consumers. Put it back in the heap. + if active_consumers < self.max_consumers { + inner.pq.push(id, active_consumers); + } } } } @@ -586,6 +682,6 @@ impl PooledClient { } pub async fn finish(self, result: Result<(), tonic::Status>) { - self.pool.return_client(self.id, result.is_ok()).await; + self.pool.return_client(self.id, result.is_ok(), self.permit).await; } }