mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
remote_storage: configurable connection pooling for ABS (#10169)
## Problem The ABS SDK's default behavior is to do no connection pooling, i.e. open and close a fresh connection for each request. Under high request rates, this can result in an accumulation of TCP connections in TIME_WAIT or CLOSE_WAIT state, and in extreme cases exhaustion of client ports. Related: https://github.com/neondatabase/cloud/issues/20971 ## Summary of changes - Add a configurable `conn_pool_size` parameter for Azure storage, defaulting to zero (current behavior) - Construct a custom reqwest client using this connection pool size.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5064,6 +5064,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -18,6 +18,7 @@ camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
hyper = { workspace = true, features = ["client"] }
|
||||
futures.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::io;
|
||||
use std::num::NonZeroU32;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
@@ -15,6 +16,8 @@ use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::HttpClient;
|
||||
use azure_core::TransportOptions;
|
||||
use azure_core::{Continuable, RetryOptions};
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
@@ -80,8 +83,13 @@ impl AzureBlobStorage {
|
||||
StorageCredentials::token_credential(token_credential)
|
||||
};
|
||||
|
||||
// we have an outer retry
|
||||
let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
|
||||
let builder = ClientBuilder::new(account, credentials)
|
||||
// we have an outer retry
|
||||
.retry(RetryOptions::none())
|
||||
// Customize transport to configure conneciton pooling
|
||||
.transport(TransportOptions::new(Self::reqwest_client(
|
||||
azure_config.conn_pool_size,
|
||||
)));
|
||||
|
||||
let client = builder.container_client(azure_config.container_name.to_owned());
|
||||
|
||||
@@ -106,6 +114,14 @@ impl AzureBlobStorage {
|
||||
})
|
||||
}
|
||||
|
||||
fn reqwest_client(conn_pool_size: usize) -> Arc<dyn HttpClient> {
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.pool_max_idle_per_host(conn_pool_size)
|
||||
.build()
|
||||
.expect("failed to build `reqwest` client");
|
||||
Arc::new(client)
|
||||
}
|
||||
|
||||
pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
|
||||
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let path_string = path.get_path().as_str();
|
||||
|
||||
@@ -114,6 +114,16 @@ fn default_max_keys_per_list_response() -> Option<i32> {
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
|
||||
}
|
||||
|
||||
fn default_azure_conn_pool_size() -> usize {
|
||||
// Conservative default: no connection pooling. At time of writing this is the Azure
|
||||
// SDK's default as well, due to historic reports of hard-to-reproduce issues
|
||||
// (https://github.com/hyperium/hyper/issues/2312)
|
||||
//
|
||||
// However, using connection pooling is important to avoid exhausting client ports when
|
||||
// doing huge numbers of requests (https://github.com/neondatabase/cloud/issues/20971)
|
||||
0
|
||||
}
|
||||
|
||||
impl Debug for S3Config {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("S3Config")
|
||||
@@ -146,6 +156,8 @@ pub struct AzureConfig {
|
||||
pub concurrency_limit: NonZeroUsize,
|
||||
#[serde(default = "default_max_keys_per_list_response")]
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
#[serde(default = "default_azure_conn_pool_size")]
|
||||
pub conn_pool_size: usize,
|
||||
}
|
||||
|
||||
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
|
||||
@@ -302,6 +314,7 @@ timeout = '5s'";
|
||||
container_region = 'westeurope'
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
conn_pool_size = 8
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap();
|
||||
@@ -316,6 +329,7 @@ timeout = '5s'";
|
||||
prefix_in_container: None,
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
conn_pool_size: 8,
|
||||
}),
|
||||
timeout: Duration::from_secs(7),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
|
||||
@@ -218,6 +218,7 @@ async fn create_azure_client(
|
||||
prefix_in_container: Some(format!("test_{millis}_{random:08x}/")),
|
||||
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response,
|
||||
conn_pool_size: 8,
|
||||
}),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
|
||||
Reference in New Issue
Block a user