diff --git a/Cargo.lock b/Cargo.lock index 7314b8a95e..a4b276b639 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2645,6 +2645,7 @@ dependencies = [ "pin-project", "prost", "query", + "reqwest", "secrecy", "serde", "serde_json", @@ -5594,7 +5595,7 @@ dependencies = [ "twox-hash", "url", "webpki", - "webpki-roots 0.23.1", + "webpki-roots", ] [[package]] @@ -7538,7 +7539,6 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.25.2", "winreg 0.50.0", ] @@ -10858,12 +10858,6 @@ dependencies = [ "rustls-webpki 0.100.2", ] -[[package]] -name = "webpki-roots" -version = "0.25.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" - [[package]] name = "which" version = "4.4.0" diff --git a/Cargo.toml b/Cargo.toml index 2ee30b714c..c2dc6ac05f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,11 @@ paste = "1.0" prost = "0.11" rand = "0.8" regex = "1.8" +reqwest = { version = "0.11", default-features = false, features = [ + "json", + "rustls-tls-native-roots", + "stream", +] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" smallvec = "1" diff --git a/src/common/greptimedb-telemetry/Cargo.toml b/src/common/greptimedb-telemetry/Cargo.toml index 0888cb59f0..e70c8aebdb 100644 --- a/src/common/greptimedb-telemetry/Cargo.toml +++ b/src/common/greptimedb-telemetry/Cargo.toml @@ -9,10 +9,7 @@ async-trait.workspace = true common-error = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } -reqwest = { version = "0.11", features = [ - "json", - "rustls-tls", -], default-features = false } +reqwest = { workspace = true } serde.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index e613684e87..8a904f241d 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -52,6 +52,7 @@ object-store = { workspace = true } pin-project = "1.0" prost.workspace = true query = { workspace = true } +reqwest = { workspace = true } secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true serde_json = "1.0" diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 0002e78e5f..cae8e69705 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,14 +20,15 @@ mod gcs; mod oss; mod s3; -use std::path; use std::sync::Arc; +use std::time::Duration; +use std::{env, path}; use common_base::readable_size::ReadableSize; use common_telemetry::logging::info; use object_store::layers::{LoggingLayer, LruCacheLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::Fs as FsBuilder; -use object_store::{util, ObjectStore, ObjectStoreBuilder}; +use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -133,3 +134,36 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> { Ok(()) } + +pub(crate) fn build_http_client() -> Result { + let http_builder = { + let mut builder = reqwest::ClientBuilder::new(); + + // Pool max idle per host controls connection pool size. + // Default to no limit, set to `0` for disable it. + let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(usize::MAX); + builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); + + // Connect timeout default to 30s. + let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(30); + builder = builder.connect_timeout(Duration::from_secs(connect_timeout)); + + // Pool connection idle timeout default to 90s. + let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(90); + + builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout)); + + builder + }; + + HttpClient::build(http_builder).context(error::InitBackendSnafu) +} diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs index 9c35c04117..97319e13e8 100644 --- a/src/datanode/src/store/azblob.rs +++ b/src/datanode/src/store/azblob.rs @@ -20,6 +20,7 @@ use snafu::prelude::*; use crate::datanode::AzblobConfig; use crate::error::{self, Result}; +use crate::store::build_http_client; pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result { let root = util::normalize_dir(&azblob_config.root); @@ -35,7 +36,8 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res .container(&azblob_config.container) .endpoint(&azblob_config.endpoint) .account_name(azblob_config.account_name.expose_secret()) - .account_key(azblob_config.account_key.expose_secret()); + .account_key(azblob_config.account_key.expose_secret()) + .http_client(build_http_client()?); if let Some(token) = &azblob_config.sas_token { let _ = builder.sas_token(token); diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs index 08ede76b39..2cb6a8e82c 100644 --- a/src/datanode/src/store/gcs.rs +++ b/src/datanode/src/store/gcs.rs @@ -20,6 +20,7 @@ use snafu::prelude::*; use crate::datanode::GcsConfig; use crate::error::{self, Result}; +use crate::store::build_http_client; pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result { let root = util::normalize_dir(&gcs_config.root); @@ -34,7 +35,8 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result Result { let root = util::normalize_dir(&oss_config.root); @@ -34,7 +35,8 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result Result { let root = util::normalize_dir(&s3_config.root); @@ -34,7 +35,8 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result