From be8ed815327951f9d8fad20316cb7c02c928abc0 Mon Sep 17 00:00:00 2001 From: Elizabeth Murray Date: Fri, 9 May 2025 16:27:58 -0700 Subject: [PATCH] Connection pool: update error accounting, sweep idle connections, add config options. --- pageserver/client_grpc/src/client_cache.rs | 66 ++++++--- pageserver/client_grpc/src/lib.rs | 140 +++++++++++++----- .../pagebench/src/cmd/getpage_latest_lsn.rs | 26 +++- 3 files changed, 175 insertions(+), 57 deletions(-) diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index a1a4447c6a..a93e7e9e83 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -21,6 +21,8 @@ pub struct ConnectionPool { error_threshold: usize, connect_timeout: Duration, connect_backoff: Duration, + // The maximum duration a connection can be idle before being removed + max_idle_duration: Duration, // This notify is signaled when a connection is released or created. notify: Notify, @@ -48,6 +50,7 @@ struct ConnectionEntry { active_consumers: usize, consecutive_successes: usize, consecutive_errors: usize, + last_used: Instant, } /// A client borrowed from the pool. @@ -65,6 +68,7 @@ impl ConnectionPool { error_threshold: usize, connect_timeout: Duration, connect_backoff: Duration, + max_idle_duration: Duration, ) -> Arc { let (request_tx, mut request_rx) = mpsc::channel::>(100); let (watch_tx, watch_rx) = watch::channel(false); @@ -78,10 +82,11 @@ impl ConnectionPool { cc_watch_rx: watch_rx, endpoint: endpoint.clone(), max_consumers: max_consumers, - error_threshold, - connect_timeout, - connect_backoff, - request_tx, + error_threshold: error_threshold, + connect_timeout: connect_timeout, + connect_backoff: connect_backoff, + max_idle_duration: max_idle_duration, + request_tx: request_tx, }); // @@ -111,9 +116,32 @@ impl ConnectionPool { } }); + // Background task to sweep idle connections + let sweeper_pool = Arc::clone(&pool); + tokio::spawn(async move { + loop { + sweeper_pool.sweep_idle_connections().await; + sleep(Duration::from_secs(5)).await; // Run every 60 seconds + } + }); + pool } + // Sweep and remove idle connections + async fn sweep_idle_connections(&self) { + let mut inner = self.inner.lock().await; + let now = Instant::now(); + inner.entries.retain(|_id, entry| { + if entry.active_consumers == 0 && now.duration_since(entry.last_used) > self.max_idle_duration { + // Remove idle connection + return false; + } + true + }); + } + + async fn acquire_connection(&self) -> (uuid::Uuid, Channel) { loop { // Reuse an existing healthy connection if available @@ -153,22 +181,20 @@ impl ConnectionPool { } loop { - // - // TODO: This would be more accurate if it waited for a timer, and the timer - // was reset when a connection failed. Using timestamps, we may miss new failures - // that occur while we are sleeping. - // - // TODO: Should the backoff be exponential? - // - if let Some(delay) = { - let inner = self.inner.lock().await; - inner.last_connect_failure.and_then(|at| { - (at.elapsed() < self.connect_backoff) - .then(|| self.connect_backoff - at.elapsed()) - }) - } { - sleep(delay).await; + loop { + if let Some(delay) = { + let inner = self.inner.lock().await; + inner.last_connect_failure.and_then(|at| { + (at.elapsed() < self.connect_backoff) + .then(|| self.connect_backoff - at.elapsed()) + }) + } { + sleep(delay).await; + } else { + break // No delay, so we can create a connection + } } + // // Create a new connection. // @@ -197,6 +223,7 @@ impl ConnectionPool { active_consumers: 0, consecutive_successes: 0, consecutive_errors: 0, + last_used: Instant::now(), }, ); self.notify.notify_one(); @@ -230,6 +257,7 @@ impl ConnectionPool { let mut inner = self.inner.lock().await; let mut new_failure = false; if let Some(entry) = inner.entries.get_mut(&id) { + entry.last_used = Instant::now(); // TODO: This should be a debug_assert if entry.active_consumers <= 0 { panic!("A consumer completed when active_consumers was zero!") diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 48ccf00292..72df0818c8 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -45,6 +45,16 @@ pub struct PageserverClient { channels: RwLock>>, auth_interceptor: AuthInterceptor, + + client_cache_options: ClientCacheOptions, +} + +pub struct ClientCacheOptions { + pub max_consumers: usize, + pub error_threshold: usize, + pub connect_timeout: Duration, + pub connect_backoff: Duration, + pub max_idle_duration: Duration, } impl PageserverClient { @@ -54,6 +64,28 @@ impl PageserverClient { timeline_id: &str, auth_token: &Option, shard_map: HashMap, + ) -> Self { + let options = ClientCacheOptions { + max_consumers: 10, + error_threshold: 5, + connect_timeout: Duration::from_secs(5), + connect_backoff: Duration::from_secs(1), + max_idle_duration: Duration::from_secs(60), + }; + Self::new_with_config( + tenant_id, + timeline_id, + auth_token, + shard_map, + options, + ) + } + pub fn new_with_config( + tenant_id: &str, + timeline_id: &str, + auth_token: &Option, + shard_map: HashMap, + options: ClientCacheOptions, ) -> Self { Self { _tenant_id: tenant_id.to_string(), @@ -62,16 +94,15 @@ impl PageserverClient { shard_map, channels: RwLock::new(HashMap::new()), auth_interceptor: AuthInterceptor::new(tenant_id, timeline_id, auth_token.as_deref()), + client_cache_options: options, } } - pub async fn process_rel_exists_request( &self, request: &RelExistsRequest, ) -> Result { // Current sharding model assumes that all metadata is present only at shard 0. let shard = ShardIndex::unsharded(); - let pooled_client = self.get_client(shard).await; let chan = pooled_client.channel(); @@ -79,11 +110,18 @@ impl PageserverClient { PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); let request = proto::RelExistsRequest::from(request); - let response = client.rel_exists(tonic::Request::new(request)).await?; + let response = client.rel_exists(tonic::Request::new(request)).await; - // TODO: check for an error and pass it to "finish" - pooled_client.finish(Ok(())).await; - Ok(response.get_ref().exists) + match response { + Err(status) => { + pooled_client.finish(Err(status.clone())).await; // Pass error to finish + return Err(PageserverClientError::RequestError(status)); + } + Ok(resp) => { + pooled_client.finish(Ok(())).await; // Pass success to finish + return Ok(resp.get_ref().exists); + } + } } pub async fn process_rel_size_request( @@ -92,7 +130,6 @@ impl PageserverClient { ) -> Result { // Current sharding model assumes that all metadata is present only at shard 0. let shard = ShardIndex::unsharded(); - let pooled_client = self.get_client(shard).await; let chan = pooled_client.channel(); @@ -100,17 +137,23 @@ impl PageserverClient { PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); let request = proto::RelSizeRequest::from(request); - let response = client.rel_size(tonic::Request::new(request)).await?; + let response = client.rel_size(tonic::Request::new(request)).await; - // TODO: check for an error and pass it to "finish" - pooled_client.finish(Ok(())).await; - Ok(response.get_ref().num_blocks) + match response { + Err(status) => { + pooled_client.finish(Err(status.clone())).await; // Pass error to finish + return Err(PageserverClientError::RequestError(status)); + } + Ok(resp) => { + pooled_client.finish(Ok(())).await; // Pass success to finish + return Ok(resp.get_ref().num_blocks); + } + } } pub async fn get_page(&self, request: &GetPageRequest) -> Result { // FIXME: calculate the shard number correctly let shard = ShardIndex::unsharded(); - let pooled_client = self.get_client(shard).await; let chan = pooled_client.channel(); @@ -119,17 +162,19 @@ impl PageserverClient { let request = proto::GetPageRequest::from(request); let response = client.get_page(tonic::Request::new(request)).await; + match response { Err(status) => { - pooled_client.finish(Err(status.clone())).await; + pooled_client.finish(Err(status.clone())).await; // Pass error to finish return Err(PageserverClientError::RequestError(status)); } Ok(resp) => { - pooled_client.finish(Ok(())).await; + pooled_client.finish(Ok(())).await; // Pass success to finish let response: GetPageResponse = resp.into_inner().try_into()?; return Ok(response.page_image); } } + } // TODO: this should use model::GetPageRequest and GetPageResponse @@ -142,21 +187,26 @@ impl PageserverClient { > { // FIXME: calculate the shard number correctly let shard = ShardIndex::unsharded(); - + let request = tonic::Request::new(requests); let pooled_client = self.get_client(shard).await; let chan = pooled_client.channel(); let mut client = PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - // Check for an error return from get_pages - // Declare response + let response = client.get_pages(request).await; - // TODO: check for an error and pass it to "finish" - pooled_client.finish(Ok(())).await; - return Ok(client.get_pages(tonic::Request::new(requests)).await?); + match response { + Err(status) => { + pooled_client.finish(Err(status.clone())).await; + return Err(PageserverClientError::RequestError(status)); + } + Ok(resp) => { + pooled_client.finish(Ok(())).await; // Pass success to finish + return Ok(resp); + } + } } - /// Process a request to get the size of a database. pub async fn process_dbsize_request( &self, @@ -171,13 +221,19 @@ impl PageserverClient { PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); let request = proto::DbSizeRequest::from(request); - let response = client.db_size(tonic::Request::new(request)).await?; + let response = client.db_size(tonic::Request::new(request)).await; - // TODO: check for an error and pass it to "finish" - pooled_client.finish(Ok(())).await; - Ok(response.get_ref().num_bytes) + match response { + Err(status) => { + pooled_client.finish(Err(status.clone())).await; // Pass error to finish + return Err(PageserverClientError::RequestError(status)); + } + Ok(resp) => { + pooled_client.finish(Ok(())).await; // Pass success to finish + return Ok(resp.get_ref().num_bytes); + } + } } - /// Process a request to get the size of a database. pub async fn get_base_backup( &self, @@ -189,7 +245,6 @@ impl PageserverClient { > { // Current sharding model assumes that all metadata is present only at shard 0. let shard = ShardIndex::unsharded(); - let pooled_client = self.get_client(shard).await; let chan = pooled_client.channel(); @@ -201,13 +256,19 @@ impl PageserverClient { } let request = proto::GetBaseBackupRequest::from(request); - let response = client.get_base_backup(tonic::Request::new(request)).await?; + let response = client.get_base_backup(tonic::Request::new(request)).await; - // TODO: check for an error and pass it to "finish" - pooled_client.finish(Ok(())).await; - Ok(response) + match response { + Err(status) => { + pooled_client.finish(Err(status.clone())).await; // Pass error to finish + return Err(PageserverClientError::RequestError(status)); + } + Ok(resp) => { + pooled_client.finish(Ok(())).await; // Pass success to finish + return Ok(resp); + } + } } - /// Get a client for given shard /// /// Get a client from the pool for this shard, also creating the pool if it doesn't exist. @@ -225,12 +286,17 @@ impl PageserverClient { return pooled_client; } None => { - let new_pool = client_cache::ConnectionPool::new( + // Create a new pool using client_cache_options + // declare new_pool + + let new_pool: Arc; + new_pool = client_cache::ConnectionPool::new( self.shard_map.get(&shard).unwrap(), - 5000, - 5, - Duration::from_millis(200), - Duration::from_secs(1), + self.client_cache_options.max_consumers, + self.client_cache_options.error_threshold, + self.client_cache_options.connect_timeout, + self.client_cache_options.connect_backoff, + self.client_cache_options.max_idle_duration, ); let mut write_pool = self.channels.write().unwrap(); write_pool.insert(shard, new_pool.clone()); diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 104361a051..3888b095f6 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -73,7 +73,23 @@ pub(crate) struct Args { #[clap(long, default_value = "1")] queue_depth: NonZeroUsize, + #[clap(long, default_value = "100")] + pool_max_consumers: NonZeroUsize, + + #[clap(long, default_value = "5")] + pool_error_threshold: NonZeroUsize, + + #[clap(long, default_value = "5000")] + pool_connect_timeout: NonZeroUsize, + + #[clap(long, default_value = "1000")] + pool_connect_backoff: NonZeroUsize, + + #[clap(long, default_value = "60000")] + pool_max_idle_duration: NonZeroUsize, + targets: Option>, + } /// State shared by all clients @@ -463,11 +479,19 @@ async fn client_grpc( ShardIndex::unsharded(), args.page_service_connstring.clone(), )]); - let client = pageserver_client_grpc::PageserverClient::new( + let options = pageserver_client_grpc::ClientCacheOptions { + max_consumers: args.pool_max_consumers.get(), + error_threshold: args.pool_error_threshold.get(), + connect_timeout: Duration::from_millis(args.pool_connect_timeout.get() as u64), + connect_backoff: Duration::from_millis(args.pool_connect_backoff.get() as u64), + max_idle_duration: Duration::from_millis(args.pool_max_idle_duration.get() as u64), + }; + let client = pageserver_client_grpc::PageserverClient::new_with_config( &worker_id.timeline.tenant_id.to_string(), &worker_id.timeline.timeline_id.to_string(), &None, shard_map, + options, ); let client = Arc::new(client);