Connection pool: update error accounting, sweep idle connections, add config options.

This commit is contained in:
Elizabeth Murray
2025-05-09 16:27:58 -07:00
parent 12b08c4b82
commit be8ed81532
3 changed files with 175 additions and 57 deletions

View File

@@ -21,6 +21,8 @@ pub struct ConnectionPool {
error_threshold: usize,
connect_timeout: Duration,
connect_backoff: Duration,
// The maximum duration a connection can be idle before being removed
max_idle_duration: Duration,
// This notify is signaled when a connection is released or created.
notify: Notify,
@@ -48,6 +50,7 @@ struct ConnectionEntry {
active_consumers: usize,
consecutive_successes: usize,
consecutive_errors: usize,
last_used: Instant,
}
/// A client borrowed from the pool.
@@ -65,6 +68,7 @@ impl ConnectionPool {
error_threshold: usize,
connect_timeout: Duration,
connect_backoff: Duration,
max_idle_duration: Duration,
) -> Arc<Self> {
let (request_tx, mut request_rx) = mpsc::channel::<mpsc::Sender<PooledClient>>(100);
let (watch_tx, watch_rx) = watch::channel(false);
@@ -78,10 +82,11 @@ impl ConnectionPool {
cc_watch_rx: watch_rx,
endpoint: endpoint.clone(),
max_consumers: max_consumers,
error_threshold,
connect_timeout,
connect_backoff,
request_tx,
error_threshold: error_threshold,
connect_timeout: connect_timeout,
connect_backoff: connect_backoff,
max_idle_duration: max_idle_duration,
request_tx: request_tx,
});
//
@@ -111,9 +116,32 @@ impl ConnectionPool {
}
});
// Background task to sweep idle connections
let sweeper_pool = Arc::clone(&pool);
tokio::spawn(async move {
loop {
sweeper_pool.sweep_idle_connections().await;
sleep(Duration::from_secs(5)).await; // Run every 60 seconds
}
});
pool
}
// Sweep and remove idle connections
async fn sweep_idle_connections(&self) {
let mut inner = self.inner.lock().await;
let now = Instant::now();
inner.entries.retain(|_id, entry| {
if entry.active_consumers == 0 && now.duration_since(entry.last_used) > self.max_idle_duration {
// Remove idle connection
return false;
}
true
});
}
async fn acquire_connection(&self) -> (uuid::Uuid, Channel) {
loop {
// Reuse an existing healthy connection if available
@@ -153,22 +181,20 @@ impl ConnectionPool {
}
loop {
//
// TODO: This would be more accurate if it waited for a timer, and the timer
// was reset when a connection failed. Using timestamps, we may miss new failures
// that occur while we are sleeping.
//
// TODO: Should the backoff be exponential?
//
if let Some(delay) = {
let inner = self.inner.lock().await;
inner.last_connect_failure.and_then(|at| {
(at.elapsed() < self.connect_backoff)
.then(|| self.connect_backoff - at.elapsed())
})
} {
sleep(delay).await;
loop {
if let Some(delay) = {
let inner = self.inner.lock().await;
inner.last_connect_failure.and_then(|at| {
(at.elapsed() < self.connect_backoff)
.then(|| self.connect_backoff - at.elapsed())
})
} {
sleep(delay).await;
} else {
break // No delay, so we can create a connection
}
}
//
// Create a new connection.
//
@@ -197,6 +223,7 @@ impl ConnectionPool {
active_consumers: 0,
consecutive_successes: 0,
consecutive_errors: 0,
last_used: Instant::now(),
},
);
self.notify.notify_one();
@@ -230,6 +257,7 @@ impl ConnectionPool {
let mut inner = self.inner.lock().await;
let mut new_failure = false;
if let Some(entry) = inner.entries.get_mut(&id) {
entry.last_used = Instant::now();
// TODO: This should be a debug_assert
if entry.active_consumers <= 0 {
panic!("A consumer completed when active_consumers was zero!")

View File

@@ -45,6 +45,16 @@ pub struct PageserverClient {
channels: RwLock<HashMap<ShardIndex, Arc<client_cache::ConnectionPool>>>,
auth_interceptor: AuthInterceptor,
client_cache_options: ClientCacheOptions,
}
pub struct ClientCacheOptions {
pub max_consumers: usize,
pub error_threshold: usize,
pub connect_timeout: Duration,
pub connect_backoff: Duration,
pub max_idle_duration: Duration,
}
impl PageserverClient {
@@ -54,6 +64,28 @@ impl PageserverClient {
timeline_id: &str,
auth_token: &Option<String>,
shard_map: HashMap<ShardIndex, String>,
) -> Self {
let options = ClientCacheOptions {
max_consumers: 10,
error_threshold: 5,
connect_timeout: Duration::from_secs(5),
connect_backoff: Duration::from_secs(1),
max_idle_duration: Duration::from_secs(60),
};
Self::new_with_config(
tenant_id,
timeline_id,
auth_token,
shard_map,
options,
)
}
pub fn new_with_config(
tenant_id: &str,
timeline_id: &str,
auth_token: &Option<String>,
shard_map: HashMap<ShardIndex, String>,
options: ClientCacheOptions,
) -> Self {
Self {
_tenant_id: tenant_id.to_string(),
@@ -62,16 +94,15 @@ impl PageserverClient {
shard_map,
channels: RwLock::new(HashMap::new()),
auth_interceptor: AuthInterceptor::new(tenant_id, timeline_id, auth_token.as_deref()),
client_cache_options: options,
}
}
pub async fn process_rel_exists_request(
&self,
request: &RelExistsRequest,
) -> Result<bool, PageserverClientError> {
// Current sharding model assumes that all metadata is present only at shard 0.
let shard = ShardIndex::unsharded();
let pooled_client = self.get_client(shard).await;
let chan = pooled_client.channel();
@@ -79,11 +110,18 @@ impl PageserverClient {
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::RelExistsRequest::from(request);
let response = client.rel_exists(tonic::Request::new(request)).await?;
let response = client.rel_exists(tonic::Request::new(request)).await;
// TODO: check for an error and pass it to "finish"
pooled_client.finish(Ok(())).await;
Ok(response.get_ref().exists)
match response {
Err(status) => {
pooled_client.finish(Err(status.clone())).await; // Pass error to finish
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
pooled_client.finish(Ok(())).await; // Pass success to finish
return Ok(resp.get_ref().exists);
}
}
}
pub async fn process_rel_size_request(
@@ -92,7 +130,6 @@ impl PageserverClient {
) -> Result<u32, PageserverClientError> {
// Current sharding model assumes that all metadata is present only at shard 0.
let shard = ShardIndex::unsharded();
let pooled_client = self.get_client(shard).await;
let chan = pooled_client.channel();
@@ -100,17 +137,23 @@ impl PageserverClient {
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::RelSizeRequest::from(request);
let response = client.rel_size(tonic::Request::new(request)).await?;
let response = client.rel_size(tonic::Request::new(request)).await;
// TODO: check for an error and pass it to "finish"
pooled_client.finish(Ok(())).await;
Ok(response.get_ref().num_blocks)
match response {
Err(status) => {
pooled_client.finish(Err(status.clone())).await; // Pass error to finish
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
pooled_client.finish(Ok(())).await; // Pass success to finish
return Ok(resp.get_ref().num_blocks);
}
}
}
pub async fn get_page(&self, request: &GetPageRequest) -> Result<Bytes, PageserverClientError> {
// FIXME: calculate the shard number correctly
let shard = ShardIndex::unsharded();
let pooled_client = self.get_client(shard).await;
let chan = pooled_client.channel();
@@ -119,17 +162,19 @@ impl PageserverClient {
let request = proto::GetPageRequest::from(request);
let response = client.get_page(tonic::Request::new(request)).await;
match response {
Err(status) => {
pooled_client.finish(Err(status.clone())).await;
pooled_client.finish(Err(status.clone())).await; // Pass error to finish
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
pooled_client.finish(Ok(())).await;
pooled_client.finish(Ok(())).await; // Pass success to finish
let response: GetPageResponse = resp.into_inner().try_into()?;
return Ok(response.page_image);
}
}
}
// TODO: this should use model::GetPageRequest and GetPageResponse
@@ -142,21 +187,26 @@ impl PageserverClient {
> {
// FIXME: calculate the shard number correctly
let shard = ShardIndex::unsharded();
let request = tonic::Request::new(requests);
let pooled_client = self.get_client(shard).await;
let chan = pooled_client.channel();
let mut client =
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
// Check for an error return from get_pages
// Declare response
let response = client.get_pages(request).await;
// TODO: check for an error and pass it to "finish"
pooled_client.finish(Ok(())).await;
return Ok(client.get_pages(tonic::Request::new(requests)).await?);
match response {
Err(status) => {
pooled_client.finish(Err(status.clone())).await;
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
pooled_client.finish(Ok(())).await; // Pass success to finish
return Ok(resp);
}
}
}
/// Process a request to get the size of a database.
pub async fn process_dbsize_request(
&self,
@@ -171,13 +221,19 @@ impl PageserverClient {
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::DbSizeRequest::from(request);
let response = client.db_size(tonic::Request::new(request)).await?;
let response = client.db_size(tonic::Request::new(request)).await;
// TODO: check for an error and pass it to "finish"
pooled_client.finish(Ok(())).await;
Ok(response.get_ref().num_bytes)
match response {
Err(status) => {
pooled_client.finish(Err(status.clone())).await; // Pass error to finish
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
pooled_client.finish(Ok(())).await; // Pass success to finish
return Ok(resp.get_ref().num_bytes);
}
}
}
/// Process a request to get the size of a database.
pub async fn get_base_backup(
&self,
@@ -189,7 +245,6 @@ impl PageserverClient {
> {
// Current sharding model assumes that all metadata is present only at shard 0.
let shard = ShardIndex::unsharded();
let pooled_client = self.get_client(shard).await;
let chan = pooled_client.channel();
@@ -201,13 +256,19 @@ impl PageserverClient {
}
let request = proto::GetBaseBackupRequest::from(request);
let response = client.get_base_backup(tonic::Request::new(request)).await?;
let response = client.get_base_backup(tonic::Request::new(request)).await;
// TODO: check for an error and pass it to "finish"
pooled_client.finish(Ok(())).await;
Ok(response)
match response {
Err(status) => {
pooled_client.finish(Err(status.clone())).await; // Pass error to finish
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
pooled_client.finish(Ok(())).await; // Pass success to finish
return Ok(resp);
}
}
}
/// Get a client for given shard
///
/// Get a client from the pool for this shard, also creating the pool if it doesn't exist.
@@ -225,12 +286,17 @@ impl PageserverClient {
return pooled_client;
}
None => {
let new_pool = client_cache::ConnectionPool::new(
// Create a new pool using client_cache_options
// declare new_pool
let new_pool: Arc<client_cache::ConnectionPool>;
new_pool = client_cache::ConnectionPool::new(
self.shard_map.get(&shard).unwrap(),
5000,
5,
Duration::from_millis(200),
Duration::from_secs(1),
self.client_cache_options.max_consumers,
self.client_cache_options.error_threshold,
self.client_cache_options.connect_timeout,
self.client_cache_options.connect_backoff,
self.client_cache_options.max_idle_duration,
);
let mut write_pool = self.channels.write().unwrap();
write_pool.insert(shard, new_pool.clone());

View File

@@ -73,7 +73,23 @@ pub(crate) struct Args {
#[clap(long, default_value = "1")]
queue_depth: NonZeroUsize,
#[clap(long, default_value = "100")]
pool_max_consumers: NonZeroUsize,
#[clap(long, default_value = "5")]
pool_error_threshold: NonZeroUsize,
#[clap(long, default_value = "5000")]
pool_connect_timeout: NonZeroUsize,
#[clap(long, default_value = "1000")]
pool_connect_backoff: NonZeroUsize,
#[clap(long, default_value = "60000")]
pool_max_idle_duration: NonZeroUsize,
targets: Option<Vec<TenantTimelineId>>,
}
/// State shared by all clients
@@ -463,11 +479,19 @@ async fn client_grpc(
ShardIndex::unsharded(),
args.page_service_connstring.clone(),
)]);
let client = pageserver_client_grpc::PageserverClient::new(
let options = pageserver_client_grpc::ClientCacheOptions {
max_consumers: args.pool_max_consumers.get(),
error_threshold: args.pool_error_threshold.get(),
connect_timeout: Duration::from_millis(args.pool_connect_timeout.get() as u64),
connect_backoff: Duration::from_millis(args.pool_connect_backoff.get() as u64),
max_idle_duration: Duration::from_millis(args.pool_max_idle_duration.get() as u64),
};
let client = pageserver_client_grpc::PageserverClient::new_with_config(
&worker_id.timeline.tenant_id.to_string(),
&worker_id.timeline.timeline_id.to_string(),
&None,
shard_map,
options,
);
let client = Arc::new(client);