feat: build http client for cloud object storage (#2314)

* feat: build http client for s3/oss/azblob storages

* chore: style

* fix: test

* fix: cargo toml fmt
This commit is contained in:
dennis zhuang
2023-09-03 20:55:19 -05:00
committed by Ruihang Xia
parent 50220f8f04
commit 38697e0c4d
10 changed files with 58 additions and 19 deletions

10
Cargo.lock generated
View File

@@ -2645,6 +2645,7 @@ dependencies = [
"pin-project",
"prost",
"query",
"reqwest",
"secrecy",
"serde",
"serde_json",
@@ -5594,7 +5595,7 @@ dependencies = [
"twox-hash",
"url",
"webpki",
"webpki-roots 0.23.1",
"webpki-roots",
]
[[package]]
@@ -7538,7 +7539,6 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.25.2",
"winreg 0.50.0",
]
@@ -10858,12 +10858,6 @@ dependencies = [
"rustls-webpki 0.100.2",
]
[[package]]
name = "webpki-roots"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "which"
version = "4.4.0"

View File

@@ -89,6 +89,11 @@ paste = "1.0"
prost = "0.11"
rand = "0.8"
regex = "1.8"
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
"stream",
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
smallvec = "1"

View File

@@ -9,10 +9,7 @@ async-trait.workspace = true
common-error = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
reqwest = { version = "0.11", features = [
"json",
"rustls-tls",
], default-features = false }
reqwest = { workspace = true }
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true

View File

@@ -52,6 +52,7 @@ object-store = { workspace = true }
pin-project = "1.0"
prost.workspace = true
query = { workspace = true }
reqwest = { workspace = true }
secrecy = { version = "0.8", features = ["serde", "alloc"] }
serde.workspace = true
serde_json = "1.0"

View File

@@ -20,14 +20,15 @@ mod gcs;
mod oss;
mod s3;
use std::path;
use std::sync::Arc;
use std::time::Duration;
use std::{env, path};
use common_base::readable_size::ReadableSize;
use common_telemetry::logging::info;
use object_store::layers::{LoggingLayer, LruCacheLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::Fs as FsBuilder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
@@ -133,3 +134,36 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}
pub(crate) fn build_http_client() -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();
// Pool max idle per host controls connection pool size.
// Default to no limit, set to `0` for disable it.
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);
builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
builder
};
HttpClient::build(http_builder).context(error::InitBackendSnafu)
}

View File

@@ -20,6 +20,7 @@ use snafu::prelude::*;
use crate::datanode::AzblobConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&azblob_config.root);
@@ -35,7 +36,8 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret());
.account_key(azblob_config.account_key.expose_secret())
.http_client(build_http_client()?);
if let Some(token) = &azblob_config.sas_token {
let _ = builder.sas_token(token);

View File

@@ -20,6 +20,7 @@ use snafu::prelude::*;
use crate::datanode::GcsConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&gcs_config.root);
@@ -34,7 +35,8 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.endpoint(&gcs_config.endpoint);
.endpoint(&gcs_config.endpoint)
.http_client(build_http_client()?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -20,6 +20,7 @@ use snafu::prelude::*;
use crate::datanode::OssConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&oss_config.root);
@@ -34,7 +35,8 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret());
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(build_http_client()?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -20,6 +20,7 @@ use snafu::prelude::*;
use crate::datanode::S3Config;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
let root = util::normalize_dir(&s3_config.root);
@@ -34,7 +35,8 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret());
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(build_http_client()?);
if s3_config.endpoint.is_some() {
let _ = builder.endpoint(s3_config.endpoint.as_ref().unwrap());

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::raw::normalize_path as raw_normalize_path;
pub use opendal::raw::oio::Pager;
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
Operator as ObjectStore, Reader, Result, Writer,