From 0c038f755f47fe0d9c5b2dc5e2fbd051514221ea Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 24 Sep 2025 14:50:47 +0800 Subject: [PATCH] refactor(cli): refactor object storage config (#7009) * refactor: refactor object storage config Signed-off-by: WenyXu * chore: public common config Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 1 + src/cli/Cargo.toml | 1 + src/cli/src/common.rs | 19 ++ src/cli/src/common/object_store.rs | 230 +++++++++++++++++ .../{metadata/common.rs => common/store.rs} | 29 ++- src/cli/src/lib.rs | 2 + src/cli/src/metadata.rs | 1 - src/cli/src/metadata/control/del/key.rs | 2 +- src/cli/src/metadata/control/del/table.rs | 3 +- src/cli/src/metadata/control/get.rs | 2 +- src/cli/src/metadata/repair.rs | 2 +- src/cli/src/metadata/snapshot.rs | 237 ++++++------------ src/cmd/src/standalone.rs | 2 +- src/common/base/src/secrets.rs | 17 +- src/common/config/src/config.rs | 2 +- src/common/meta/src/snapshot.rs | 8 +- src/datanode/src/config.rs | 7 +- src/datanode/src/store.rs | 12 +- src/object-store/src/config.rs | 236 ++++++++--------- src/object-store/src/factory.rs | 76 ++---- src/object-store/src/lib.rs | 2 + src/object-store/src/util.rs | 13 +- tests-integration/src/test_util.rs | 104 +++----- 23 files changed, 546 insertions(+), 462 deletions(-) create mode 100644 src/cli/src/common.rs create mode 100644 src/cli/src/common/object_store.rs rename src/cli/src/{metadata/common.rs => common/store.rs} (91%) diff --git a/Cargo.lock b/Cargo.lock index c5ae7dd2d5..e4d378e357 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1800,6 +1800,7 @@ dependencies = [ "nu-ansi-term", "object-store", "operator", + "paste", "query", "rand 0.9.1", "reqwest", diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 660c996620..a824fda511 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -51,6 +51,7 @@ meta-srv.workspace = true nu-ansi-term = "0.46" object-store.workspace = true operator.workspace = true +paste.workspace = true query.workspace = true rand.workspace = true reqwest.workspace = true diff --git a/src/cli/src/common.rs b/src/cli/src/common.rs new file mode 100644 index 0000000000..9fb292829d --- /dev/null +++ b/src/cli/src/common.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod object_store; +mod store; + +pub use object_store::ObjectStoreConfig; +pub use store::StoreConfig; diff --git a/src/cli/src/common/object_store.rs b/src/cli/src/common/object_store.rs new file mode 100644 index 0000000000..2f51190208 --- /dev/null +++ b/src/cli/src/common/object_store.rs @@ -0,0 +1,230 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::secrets::SecretString; +use common_error::ext::BoxedError; +use object_store::config::FileConfig; +use object_store::services::{Azblob, Gcs, Oss, S3}; +use object_store::util::{with_instrument_layers, with_retry_layers}; +use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection}; +use paste::paste; +use snafu::ResultExt; + +use crate::error::{self}; + +macro_rules! wrap_with_clap_prefix { + ( + $new_name:ident, $prefix:literal, $base:ty, { + $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)? + } + ) => { + paste!{ + #[derive(clap::Parser, Debug, Clone, PartialEq, Default)] + pub struct $new_name { + $( + $( #[doc = $doc] )? + $( #[clap(alias = $alias)] )? + #[clap(long $(, default_value_t = $default )? )] + [<$prefix $field>]: $type, + )* + } + + impl From<$new_name> for $base { + fn from(w: $new_name) -> Self { + Self { + $( $field: w.[<$prefix $field>] ),* + } + } + } + } + }; +} + +wrap_with_clap_prefix! { + PrefixedAzblobConnection, + "azblob-", + AzblobConnection, + { + #[doc = "The container of the object store."] + container: String = Default::default(), + #[doc = "The root of the object store."] + root: String = Default::default(), + #[doc = "The account name of the object store."] + account_name: SecretString = Default::default(), + #[doc = "The account key of the object store."] + account_key: SecretString = Default::default(), + #[doc = "The endpoint of the object store."] + endpoint: String = Default::default(), + #[doc = "The SAS token of the object store."] + sas_token: Option, + } +} + +wrap_with_clap_prefix! { + PrefixedS3Connection, + "s3-", + S3Connection, + { + #[doc = "The bucket of the object store."] + bucket: String = Default::default(), + #[doc = "The root of the object store."] + root: String = Default::default(), + #[doc = "The access key ID of the object store."] + access_key_id: SecretString = Default::default(), + #[doc = "The secret access key of the object store."] + secret_access_key: SecretString = Default::default(), + #[doc = "The endpoint of the object store."] + endpoint: Option, + #[doc = "The region of the object store."] + region: Option, + #[doc = "Enable virtual host style for the object store."] + enable_virtual_host_style: bool = Default::default(), + } +} + +wrap_with_clap_prefix! { + PrefixedOssConnection, + "oss-", + OssConnection, + { + #[doc = "The bucket of the object store."] + bucket: String = Default::default(), + #[doc = "The root of the object store."] + root: String = Default::default(), + #[doc = "The access key ID of the object store."] + access_key_id: SecretString = Default::default(), + #[doc = "The access key secret of the object store."] + access_key_secret: SecretString = Default::default(), + #[doc = "The endpoint of the object store."] + endpoint: String = Default::default(), + } +} + +wrap_with_clap_prefix! { + PrefixedGcsConnection, + "gcs-", + GcsConnection, + { + #[doc = "The root of the object store."] + root: String = Default::default(), + #[doc = "The bucket of the object store."] + bucket: String = Default::default(), + #[doc = "The scope of the object store."] + scope: String = Default::default(), + #[doc = "The credential path of the object store."] + credential_path: SecretString = Default::default(), + #[doc = "The credential of the object store."] + credential: SecretString = Default::default(), + #[doc = "The endpoint of the object store."] + endpoint: String = Default::default(), + } +} + +/// common config for object store. +#[derive(clap::Parser, Debug, Clone, PartialEq, Default)] +pub struct ObjectStoreConfig { + /// Whether to use S3 object store. + #[clap(long, alias = "s3")] + pub enable_s3: bool, + + #[clap(flatten)] + pub s3: PrefixedS3Connection, + + /// Whether to use OSS. + #[clap(long, alias = "oss")] + pub enable_oss: bool, + + #[clap(flatten)] + pub oss: PrefixedOssConnection, + + /// Whether to use GCS. + #[clap(long, alias = "gcs")] + pub enable_gcs: bool, + + #[clap(flatten)] + pub gcs: PrefixedGcsConnection, + + /// Whether to use Azure Blob. + #[clap(long, alias = "azblob")] + pub enable_azblob: bool, + + #[clap(flatten)] + pub azblob: PrefixedAzblobConnection, +} + +/// Creates a new file system object store. +pub fn new_fs_object_store_in_current_dir() -> std::result::Result { + let data_home = "."; + let object_store = + object_store::factory::new_fs_object_store(data_home, &FileConfig::default()) + .map_err(BoxedError::new)?; + + Ok(with_instrument_layers(object_store, false)) +} + +impl ObjectStoreConfig { + /// Builds the object store from the config. + pub fn build(&self) -> Result { + let object_store = if self.enable_s3 { + let s3 = S3Connection::from(self.s3.clone()); + common_telemetry::info!("Building object store with s3: {:?}", s3); + Some( + ObjectStore::new(S3::from(&s3)) + .context(error::InitBackendSnafu) + .map_err(BoxedError::new)? + .finish(), + ) + } else if self.enable_oss { + let oss = OssConnection::from(self.oss.clone()); + common_telemetry::info!("Building object store with oss: {:?}", oss); + Some( + ObjectStore::new(Oss::from(&oss)) + .context(error::InitBackendSnafu) + .map_err(BoxedError::new)? + .finish(), + ) + } else if self.enable_gcs { + let gcs = GcsConnection::from(self.gcs.clone()); + common_telemetry::info!("Building object store with gcs: {:?}", gcs); + Some( + ObjectStore::new(Gcs::from(&gcs)) + .context(error::InitBackendSnafu) + .map_err(BoxedError::new)? + .finish(), + ) + } else if self.enable_azblob { + let azblob = AzblobConnection::from(self.azblob.clone()); + common_telemetry::info!("Building object store with azblob: {:?}", azblob); + Some( + ObjectStore::new(Azblob::from(&azblob)) + .context(error::InitBackendSnafu) + .map_err(BoxedError::new)? + .finish(), + ) + } else { + None + }; + + let object_store = object_store + .map(|object_store| with_instrument_layers(with_retry_layers(object_store), false)); + + match object_store { + Some(object_store) => Ok(object_store), + None => Ok(with_instrument_layers( + new_fs_object_store_in_current_dir()?, + false, + )), + } + } +} diff --git a/src/cli/src/metadata/common.rs b/src/cli/src/common/store.rs similarity index 91% rename from src/cli/src/metadata/common.rs rename to src/cli/src/common/store.rs index 17c1da4665..eccd490b92 100644 --- a/src/cli/src/metadata/common.rs +++ b/src/cli/src/common/store.rs @@ -26,7 +26,7 @@ use servers::tls::{TlsMode, TlsOption}; use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu}; #[derive(Debug, Default, Parser)] -pub(crate) struct StoreConfig { +pub struct StoreConfig { /// The endpoint of store. one of etcd, postgres or mysql. /// /// For postgres store, the format is: @@ -38,48 +38,48 @@ pub(crate) struct StoreConfig { /// For mysql store, the format is: /// "mysql://user:password@ip:port/dbname" #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)] - store_addrs: Vec, + pub store_addrs: Vec, /// The maximum number of operations in a transaction. Only used when using [etcd-store]. #[clap(long, default_value = "128")] - max_txn_ops: usize, + pub max_txn_ops: usize, /// The metadata store backend. #[clap(long, value_enum, default_value = "etcd-store")] - backend: BackendImpl, + pub backend: BackendImpl, /// The key prefix of the metadata store. #[clap(long, default_value = "")] - store_key_prefix: String, + pub store_key_prefix: String, /// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store]. #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)] - meta_table_name: String, + pub meta_table_name: String, /// Optional PostgreSQL schema for metadata table (defaults to current search_path if unset). #[cfg(feature = "pg_kvbackend")] #[clap(long)] - meta_schema_name: Option, + pub meta_schema_name: Option, /// TLS mode for backend store connections (etcd, PostgreSQL, MySQL) #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")] - backend_tls_mode: TlsMode, + pub backend_tls_mode: TlsMode, /// Path to TLS certificate file for backend store connections #[clap(long = "backend-tls-cert-path", default_value = "")] - backend_tls_cert_path: String, + pub backend_tls_cert_path: String, /// Path to TLS private key file for backend store connections #[clap(long = "backend-tls-key-path", default_value = "")] - backend_tls_key_path: String, + pub backend_tls_key_path: String, /// Path to TLS CA certificate file for backend store connections #[clap(long = "backend-tls-ca-cert-path", default_value = "")] - backend_tls_ca_cert_path: String, + pub backend_tls_ca_cert_path: String, /// Enable watching TLS certificate files for changes #[clap(long = "backend-tls-watch")] - backend_tls_watch: bool, + pub backend_tls_watch: bool, } impl StoreConfig { @@ -104,6 +104,11 @@ impl StoreConfig { if store_addrs.is_empty() { EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new) } else { + common_telemetry::info!( + "Building kvbackend with store addrs: {:?}, backend: {:?}", + store_addrs, + self.backend + ); let kvbackend = match self.backend { BackendImpl::EtcdStore => { let tls_config = self.tls_config(); diff --git a/src/cli/src/lib.rs b/src/cli/src/lib.rs index 38bf67931e..b1125f093b 100644 --- a/src/cli/src/lib.rs +++ b/src/cli/src/lib.rs @@ -14,6 +14,7 @@ #![allow(clippy::print_stdout)] mod bench; +mod common; mod data; mod database; pub mod error; @@ -21,6 +22,7 @@ mod metadata; use async_trait::async_trait; use clap::Parser; +pub use common::{ObjectStoreConfig, StoreConfig}; use common_error::ext::BoxedError; pub use database::DatabaseClient; use error::Result; diff --git a/src/cli/src/metadata.rs b/src/cli/src/metadata.rs index aea06bc9c3..48eb92bb2d 100644 --- a/src/cli/src/metadata.rs +++ b/src/cli/src/metadata.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod common; mod control; mod repair; mod snapshot; diff --git a/src/cli/src/metadata/control/del/key.rs b/src/cli/src/metadata/control/del/key.rs index ccf3b1089c..ded3e4d2fe 100644 --- a/src/cli/src/metadata/control/del/key.rs +++ b/src/cli/src/metadata/control/del/key.rs @@ -20,7 +20,7 @@ use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::store::RangeRequest; use crate::Tool; -use crate::metadata::common::StoreConfig; +use crate::common::StoreConfig; use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX; /// Delete key-value pairs logically from the metadata store. diff --git a/src/cli/src/metadata/control/del/table.rs b/src/cli/src/metadata/control/del/table.rs index f030a4d952..d1346928fe 100644 --- a/src/cli/src/metadata/control/del/table.rs +++ b/src/cli/src/metadata/control/del/table.rs @@ -24,8 +24,8 @@ use common_meta::kv_backend::KvBackendRef; use store_api::storage::TableId; use crate::Tool; +use crate::common::StoreConfig; use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu}; -use crate::metadata::common::StoreConfig; use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX; use crate::metadata::control::utils::get_table_id_by_name; @@ -48,6 +48,7 @@ pub struct DelTableCommand { #[clap(long, default_value = DEFAULT_CATALOG_NAME)] catalog_name: String, + /// The store config. #[clap(flatten)] store: StoreConfig, } diff --git a/src/cli/src/metadata/control/get.rs b/src/cli/src/metadata/control/get.rs index 4f1f8e7edc..c174ee4cd6 100644 --- a/src/cli/src/metadata/control/get.rs +++ b/src/cli/src/metadata/control/get.rs @@ -28,8 +28,8 @@ use common_meta::rpc::store::RangeRequest; use futures::TryStreamExt; use crate::Tool; +use crate::common::StoreConfig; use crate::error::InvalidArgumentsSnafu; -use crate::metadata::common::StoreConfig; use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter}; /// Getting metadata from metadata store. diff --git a/src/cli/src/metadata/repair.rs b/src/cli/src/metadata/repair.rs index af279e3e38..ec4822206b 100644 --- a/src/cli/src/metadata/repair.rs +++ b/src/cli/src/metadata/repair.rs @@ -38,10 +38,10 @@ use snafu::{ResultExt, ensure}; use store_api::storage::TableId; use crate::Tool; +use crate::common::StoreConfig; use crate::error::{ InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu, }; -use crate::metadata::common::StoreConfig; use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator}; /// Repair metadata of logical tables. diff --git a/src/cli/src/metadata/snapshot.rs b/src/cli/src/metadata/snapshot.rs index 4084f90653..6580b93b6c 100644 --- a/src/cli/src/metadata/snapshot.rs +++ b/src/cli/src/metadata/snapshot.rs @@ -16,16 +16,14 @@ use std::path::Path; use async_trait::async_trait; use clap::{Parser, Subcommand}; -use common_base::secrets::{ExposeSecret, SecretString}; use common_error::ext::BoxedError; use common_meta::snapshot::MetadataSnapshotManager; use object_store::ObjectStore; -use object_store::services::{Fs, S3}; -use snafu::{OptionExt, ResultExt}; +use snafu::OptionExt; use crate::Tool; -use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu}; -use crate::metadata::common::StoreConfig; +use crate::common::{ObjectStoreConfig, StoreConfig}; +use crate::error::UnexpectedSnafu; /// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information. #[derive(Subcommand)] @@ -48,65 +46,6 @@ impl SnapshotCommand { } } -// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore -#[derive(Debug, Default, Parser)] -struct S3Config { - /// whether to use s3 as the output directory. default is false. - #[clap(long, default_value = "false")] - s3: bool, - /// The s3 bucket name. - #[clap(long)] - s3_bucket: Option, - /// The s3 region. - #[clap(long)] - s3_region: Option, - /// The s3 access key. - #[clap(long)] - s3_access_key: Option, - /// The s3 secret key. - #[clap(long)] - s3_secret_key: Option, - /// The s3 endpoint. we will automatically use the default s3 decided by the region if not set. - #[clap(long)] - s3_endpoint: Option, -} - -impl S3Config { - pub fn build(&self, root: &str) -> Result, BoxedError> { - if !self.s3 { - Ok(None) - } else { - if self.s3_region.is_none() - || self.s3_access_key.is_none() - || self.s3_secret_key.is_none() - || self.s3_bucket.is_none() - { - return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new); - } - // Safety, unwrap is safe because we have checked the options above. - let mut config = S3::default() - .bucket(self.s3_bucket.as_ref().unwrap()) - .region(self.s3_region.as_ref().unwrap()) - .access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret()) - .secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret()); - - if !root.is_empty() && root != "." { - config = config.root(root); - } - - if let Some(endpoint) = &self.s3_endpoint { - config = config.endpoint(endpoint); - } - Ok(Some( - ObjectStore::new(config) - .context(OpenDalSnafu) - .map_err(BoxedError::new)? - .finish(), - )) - } - } -} - /// Export metadata snapshot tool. /// This tool is used to export metadata snapshot from etcd, pg or mysql. /// It will dump the metadata snapshot to local file or s3 bucket. @@ -116,60 +55,41 @@ pub struct SaveCommand { /// The store configuration. #[clap(flatten)] store: StoreConfig, - /// The s3 config. + /// The object store configuration. #[clap(flatten)] - s3_config: S3Config, + object_store: ObjectStoreConfig, /// The name of the target snapshot file. we will add the file extension automatically. #[clap(long, default_value = "metadata_snapshot")] file_name: String, /// The directory to store the snapshot file. - /// if target output is s3 bucket, this is the root directory in the bucket. - /// if target output is local file, this is the local directory. - #[clap(long, default_value = "")] - output_dir: String, -} - -fn create_local_file_object_store(root: &str) -> Result { - let root = if root.is_empty() { "." } else { root }; - let object_store = ObjectStore::new(Fs::default().root(root)) - .context(OpenDalSnafu) - .map_err(BoxedError::new)? - .finish(); - Ok(object_store) + #[clap(long, default_value = "", alias = "output_dir")] + dir: String, } impl SaveCommand { pub async fn build(&self) -> Result, BoxedError> { let kvbackend = self.store.build().await?; - let output_dir = &self.output_dir; - let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?; - if let Some(store) = object_store { - let tool = MetaSnapshotTool { - inner: MetadataSnapshotManager::new(kvbackend, store), - target_file: self.file_name.clone(), - }; - Ok(Box::new(tool)) - } else { - let object_store = create_local_file_object_store(output_dir)?; - let tool = MetaSnapshotTool { - inner: MetadataSnapshotManager::new(kvbackend, object_store), - target_file: self.file_name.clone(), - }; - Ok(Box::new(tool)) - } + let object_store = self.object_store.build().map_err(BoxedError::new)?; + let tool = MetaSnapshotTool { + inner: MetadataSnapshotManager::new(kvbackend, object_store), + path: self.dir.clone(), + file_name: self.file_name.clone(), + }; + Ok(Box::new(tool)) } } struct MetaSnapshotTool { inner: MetadataSnapshotManager, - target_file: String, + path: String, + file_name: String, } #[async_trait] impl Tool for MetaSnapshotTool { async fn do_work(&self) -> std::result::Result<(), BoxedError> { self.inner - .dump("", &self.target_file) + .dump(&self.path, &self.file_name) .await .map_err(BoxedError::new)?; Ok(()) @@ -186,15 +106,15 @@ pub struct RestoreCommand { /// The store configuration. #[clap(flatten)] store: StoreConfig, - /// The s3 config. + /// The object store config. #[clap(flatten)] - s3_config: S3Config, + object_store: ObjectStoreConfig, /// The name of the target snapshot file. #[clap(long, default_value = "metadata_snapshot.metadata.fb")] file_name: String, /// The directory to store the snapshot file. - #[clap(long, default_value = ".")] - input_dir: String, + #[clap(long, default_value = ".", alias = "input_dir")] + dir: String, #[clap(long, default_value = "false")] force: bool, } @@ -202,38 +122,39 @@ pub struct RestoreCommand { impl RestoreCommand { pub async fn build(&self) -> Result, BoxedError> { let kvbackend = self.store.build().await?; - let input_dir = &self.input_dir; - let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?; - if let Some(store) = object_store { - let tool = MetaRestoreTool::new( - MetadataSnapshotManager::new(kvbackend, store), - self.file_name.clone(), - self.force, - ); - Ok(Box::new(tool)) - } else { - let object_store = create_local_file_object_store(input_dir)?; - let tool = MetaRestoreTool::new( - MetadataSnapshotManager::new(kvbackend, object_store), - self.file_name.clone(), - self.force, - ); - Ok(Box::new(tool)) - } + let input_dir = &self.dir; + let file_path = Path::new(input_dir).join(&self.file_name); + let file_path = file_path + .to_str() + .with_context(|| UnexpectedSnafu { + msg: format!( + "Invalid file path, input dir: {}, file name: {}", + input_dir, &self.file_name + ), + }) + .map_err(BoxedError::new)?; + + let object_store = self.object_store.build().map_err(BoxedError::new)?; + let tool = MetaRestoreTool::new( + MetadataSnapshotManager::new(kvbackend, object_store), + file_path.to_string(), + self.force, + ); + Ok(Box::new(tool)) } } struct MetaRestoreTool { inner: MetadataSnapshotManager, - source_file: String, + file_path: String, force: bool, } impl MetaRestoreTool { - pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self { + pub fn new(inner: MetadataSnapshotManager, file_path: String, force: bool) -> Self { Self { inner, - source_file, + file_path, force, } } @@ -252,7 +173,7 @@ impl Tool for MetaRestoreTool { "The target source is clean, we will restore the metadata snapshot." ); self.inner - .restore(&self.source_file) + .restore(&self.file_path) .await .map_err(BoxedError::new)?; Ok(()) @@ -266,7 +187,7 @@ impl Tool for MetaRestoreTool { "The target source is not clean, We will restore the metadata snapshot with --force." ); self.inner - .restore(&self.source_file) + .restore(&self.file_path) .await .map_err(BoxedError::new)?; Ok(()) @@ -280,12 +201,15 @@ impl Tool for MetaRestoreTool { /// It prints the filtered metadata to the console. #[derive(Debug, Default, Parser)] pub struct InfoCommand { - /// The s3 config. + /// The object store config. #[clap(flatten)] - s3_config: S3Config, + object_store: ObjectStoreConfig, /// The name of the target snapshot file. we will add the file extension automatically. #[clap(long, default_value = "metadata_snapshot")] file_name: String, + /// The directory to store the snapshot file. + #[clap(long, default_value = ".", alias = "input_dir")] + dir: String, /// The query string to filter the metadata. #[clap(long, default_value = "*")] inspect_key: String, @@ -296,7 +220,7 @@ pub struct InfoCommand { struct MetaInfoTool { inner: ObjectStore, - source_file: String, + file_path: String, inspect_key: String, limit: Option, } @@ -306,7 +230,7 @@ impl Tool for MetaInfoTool { async fn do_work(&self) -> std::result::Result<(), BoxedError> { let result = MetadataSnapshotManager::info( &self.inner, - &self.source_file, + &self.file_path, &self.inspect_key, self.limit, ) @@ -320,45 +244,24 @@ impl Tool for MetaInfoTool { } impl InfoCommand { - fn decide_object_store_root_for_local_store( - file_path: &str, - ) -> Result<(&str, &str), BoxedError> { - let path = Path::new(file_path); - let parent = path - .parent() - .and_then(|p| p.to_str()) - .context(InvalidFilePathSnafu { msg: file_path }) - .map_err(BoxedError::new)?; - let file_name = path - .file_name() - .and_then(|f| f.to_str()) - .context(InvalidFilePathSnafu { msg: file_path }) - .map_err(BoxedError::new)?; - let root = if parent.is_empty() { "." } else { parent }; - Ok((root, file_name)) - } - pub async fn build(&self) -> Result, BoxedError> { - let object_store = self.s3_config.build("").map_err(BoxedError::new)?; - if let Some(store) = object_store { - let tool = MetaInfoTool { - inner: store, - source_file: self.file_name.clone(), - inspect_key: self.inspect_key.clone(), - limit: self.limit, - }; - Ok(Box::new(tool)) - } else { - let (root, file_name) = - Self::decide_object_store_root_for_local_store(&self.file_name)?; - let object_store = create_local_file_object_store(root)?; - let tool = MetaInfoTool { - inner: object_store, - source_file: file_name.to_string(), - inspect_key: self.inspect_key.clone(), - limit: self.limit, - }; - Ok(Box::new(tool)) - } + let object_store = self.object_store.build().map_err(BoxedError::new)?; + let file_path = Path::new(&self.dir).join(&self.file_name); + let file_path = file_path + .to_str() + .with_context(|| UnexpectedSnafu { + msg: format!( + "Invalid file path, input dir: {}, file name: {}", + &self.dir, &self.file_name + ), + }) + .map_err(BoxedError::new)?; + let tool = MetaInfoTool { + inner: object_store, + file_path: file_path.to_string(), + inspect_key: self.inspect_key.clone(), + limit: self.limit, + }; + Ok(Box::new(tool)) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index fa4c4e4320..4863f5e6b6 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -740,7 +740,7 @@ mod tests { object_store::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "SecretBox([REDACTED])".to_string(), - format!("{:?}", s3_config.access_key_id) + format!("{:?}", s3_config.connection.access_key_id) ); } _ => { diff --git a/src/common/base/src/secrets.rs b/src/common/base/src/secrets.rs index b31275136a..55f569fe23 100644 --- a/src/common/base/src/secrets.rs +++ b/src/common/base/src/secrets.rs @@ -31,7 +31,7 @@ //! types of `SecretBox` to be serializable with `serde`, you will need to impl //! the [`SerializableSecret`] marker trait on `T` -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::{any, fmt}; use serde::{Deserialize, Serialize, de, ser}; @@ -46,6 +46,12 @@ impl From for SecretString { } } +impl Display for SecretString { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SecretString([REDACTED])") + } +} + /// Wrapper type for values that contains secrets. /// /// It attempts to limit accidental exposure and ensure secrets are wiped from memory when dropped. @@ -165,6 +171,15 @@ impl ExposeSecretMut for SecretBox { } } +impl PartialEq for SecretBox +where + S: PartialEq + Zeroize, +{ + fn eq(&self, other: &Self) -> bool { + self.inner_secret == other.inner_secret + } +} + /// Expose a reference to an inner secret pub trait ExposeSecret { /// Expose secret: this is the only method providing access to a secret. diff --git a/src/common/config/src/config.rs b/src/common/config/src/config.rs index 07299acce6..e25c46a0c0 100644 --- a/src/common/config/src/config.rs +++ b/src/common/config/src/config.rs @@ -213,7 +213,7 @@ mod tests { // Check the configs from environment variables. match &opts.storage.store { object_store::config::ObjectStoreConfig::S3(s3_config) => { - assert_eq!(s3_config.bucket, "mybucket".to_string()); + assert_eq!(s3_config.connection.bucket, "mybucket".to_string()); } _ => panic!("unexpected store type"), } diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs index 54d5dcf224..a5d000364f 100644 --- a/src/common/meta/src/snapshot.rs +++ b/src/common/meta/src/snapshot.rs @@ -244,9 +244,11 @@ impl MetadataSnapshotManager { let file_path_buf = [path, filename.to_string().as_str()] .iter() .collect::(); - let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu { - reason: format!("Invalid file path: {}, filename: {}", path, filename_str), - })?; + let file_path = file_path_buf + .to_str() + .with_context(|| InvalidFileNameSnafu { + reason: format!("Invalid file path: {}, filename: {}", path, filename_str), + })?; // Ensure the file does not exist ensure!( !self diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 9829c1a982..06f4d50da3 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -188,9 +188,12 @@ mod tests { ObjectStoreConfig::S3(cfg) => { assert_eq!( "SecretBox([REDACTED])".to_string(), - format!("{:?}", cfg.access_key_id) + format!("{:?}", cfg.connection.access_key_id) + ); + assert_eq!( + "access_key_id", + cfg.connection.access_key_id.expose_secret() ); - assert_eq!("access_key_id", cfg.access_key_id.expose_secret()); } _ => unreachable!(), } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 88b90883ef..ec25f84fd1 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use common_telemetry::info; use object_store::factory::new_raw_object_store; -use object_store::layers::{LruCacheLayer, RetryLayer}; +use object_store::layers::LruCacheLayer; use object_store::services::Fs; -use object_store::util::{PrintDetailedError, clean_temp_dir, join_dir, with_instrument_layers}; +use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers}; use object_store::{ ATOMIC_WRITE_DIR, Access, OLD_ATOMIC_WRITE_DIR, ObjectStore, ObjectStoreBuilder, }; @@ -30,14 +30,6 @@ use snafu::prelude::*; use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig}; use crate::error::{self, CreateDirSnafu, Result}; -fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { - object_store.layer( - RetryLayer::new() - .with_jitter() - .with_notify(PrintDetailedError), - ) -} - pub(crate) async fn new_object_store_without_cache( store: &ObjectStoreConfig, data_home: &str, diff --git a/src/object-store/src/config.rs b/src/object-store/src/config.rs index c692447c3a..0e41cc9502 100644 --- a/src/object-store/src/config.rs +++ b/src/object-store/src/config.rs @@ -16,8 +16,11 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_base::secrets::{ExposeSecret, SecretString}; +use opendal::services::{Azblob, Gcs, Oss, S3}; use serde::{Deserialize, Serialize}; +use crate::util; + /// Object storage config #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] @@ -75,10 +78,9 @@ impl ObjectStoreConfig { #[serde(default)] pub struct FileConfig {} -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] #[serde(default)] -pub struct S3Config { - pub name: String, +pub struct S3Connection { pub bucket: String, pub root: String, #[serde(skip_serializing)] @@ -91,47 +93,46 @@ pub struct S3Config { /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com pub enable_virtual_host_style: bool, +} + +impl From<&S3Connection> for S3 { + fn from(connection: &S3Connection) -> Self { + let root = util::normalize_dir(&connection.root); + + let mut builder = S3::default() + .root(&root) + .bucket(&connection.bucket) + .access_key_id(connection.access_key_id.expose_secret()) + .secret_access_key(connection.secret_access_key.expose_secret()); + + if let Some(endpoint) = &connection.endpoint { + builder = builder.endpoint(endpoint); + } + if let Some(region) = &connection.region { + builder = builder.region(region); + } + if connection.enable_virtual_host_style { + builder = builder.enable_virtual_host_style(); + } + + builder + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(default)] +pub struct S3Config { + pub name: String, + #[serde(flatten)] + pub connection: S3Connection, #[serde(flatten)] pub cache: ObjectStorageCacheConfig, pub http_client: HttpClientConfig, } -impl Default for S3Config { - fn default() -> Self { - Self { - name: String::default(), - bucket: String::default(), - root: String::default(), - access_key_id: SecretString::from(String::default()), - secret_access_key: SecretString::from(String::default()), - enable_virtual_host_style: false, - endpoint: Option::default(), - region: Option::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -impl PartialEq for S3Config { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.bucket == other.bucket - && self.root == other.root - && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() - && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret() - && self.endpoint == other.endpoint - && self.region == other.region - && self.enable_virtual_host_style == other.enable_virtual_host_style - && self.cache == other.cache - && self.http_client == other.http_client - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] #[serde(default)] -pub struct OssConfig { - pub name: String, +pub struct OssConnection { pub bucket: String, pub root: String, #[serde(skip_serializing)] @@ -139,43 +140,34 @@ pub struct OssConfig { #[serde(skip_serializing)] pub access_key_secret: SecretString, pub endpoint: String, +} + +impl From<&OssConnection> for Oss { + fn from(connection: &OssConnection) -> Self { + let root = util::normalize_dir(&connection.root); + Oss::default() + .root(&root) + .bucket(&connection.bucket) + .endpoint(&connection.endpoint) + .access_key_id(connection.access_key_id.expose_secret()) + .access_key_secret(connection.access_key_secret.expose_secret()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(default)] +pub struct OssConfig { + pub name: String, + #[serde(flatten)] + pub connection: OssConnection, #[serde(flatten)] pub cache: ObjectStorageCacheConfig, pub http_client: HttpClientConfig, } -impl PartialEq for OssConfig { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.bucket == other.bucket - && self.root == other.root - && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() - && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret() - && self.endpoint == other.endpoint - && self.cache == other.cache - && self.http_client == other.http_client - } -} - -impl Default for OssConfig { - fn default() -> Self { - Self { - name: String::default(), - bucket: String::default(), - root: String::default(), - access_key_id: SecretString::from(String::default()), - access_key_secret: SecretString::from(String::default()), - endpoint: String::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] #[serde(default)] -pub struct AzblobConfig { - pub name: String, +pub struct AzblobConnection { pub container: String, pub root: String, #[serde(skip_serializing)] @@ -184,44 +176,40 @@ pub struct AzblobConfig { pub account_key: SecretString, pub endpoint: String, pub sas_token: Option, +} + +impl From<&AzblobConnection> for Azblob { + fn from(connection: &AzblobConnection) -> Self { + let root = util::normalize_dir(&connection.root); + let mut builder = Azblob::default() + .root(&root) + .container(&connection.container) + .endpoint(&connection.endpoint) + .account_name(connection.account_name.expose_secret()) + .account_key(connection.account_key.expose_secret()); + + if let Some(token) = &connection.sas_token { + builder = builder.sas_token(token); + }; + + builder + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(default)] +pub struct AzblobConfig { + pub name: String, + #[serde(flatten)] + pub connection: AzblobConnection, #[serde(flatten)] pub cache: ObjectStorageCacheConfig, pub http_client: HttpClientConfig, } -impl PartialEq for AzblobConfig { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.container == other.container - && self.root == other.root - && self.account_name.expose_secret() == other.account_name.expose_secret() - && self.account_key.expose_secret() == other.account_key.expose_secret() - && self.endpoint == other.endpoint - && self.sas_token == other.sas_token - && self.cache == other.cache - && self.http_client == other.http_client - } -} -impl Default for AzblobConfig { - fn default() -> Self { - Self { - name: String::default(), - container: String::default(), - root: String::default(), - account_name: SecretString::from(String::default()), - account_key: SecretString::from(String::default()), - endpoint: String::default(), - sas_token: Option::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] #[serde(default)] -pub struct GcsConfig { - pub name: String, +pub struct GcsConnection { pub root: String, pub bucket: String, pub scope: String, @@ -230,41 +218,31 @@ pub struct GcsConfig { #[serde(skip_serializing)] pub credential: SecretString, pub endpoint: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(default)] +pub struct GcsConfig { + pub name: String, + #[serde(flatten)] + pub connection: GcsConnection, #[serde(flatten)] pub cache: ObjectStorageCacheConfig, pub http_client: HttpClientConfig, } -impl Default for GcsConfig { - fn default() -> Self { - Self { - name: String::default(), - root: String::default(), - bucket: String::default(), - scope: String::default(), - credential_path: SecretString::from(String::default()), - credential: SecretString::from(String::default()), - endpoint: String::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } +impl From<&GcsConnection> for Gcs { + fn from(connection: &GcsConnection) -> Self { + let root = util::normalize_dir(&connection.root); + Gcs::default() + .root(&root) + .bucket(&connection.bucket) + .scope(&connection.scope) + .credential_path(connection.credential_path.expose_secret()) + .credential(connection.credential.expose_secret()) + .endpoint(&connection.endpoint) } } - -impl PartialEq for GcsConfig { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.root == other.root - && self.bucket == other.bucket - && self.scope == other.scope - && self.credential_path.expose_secret() == other.credential_path.expose_secret() - && self.credential.expose_secret() == other.credential.expose_secret() - && self.endpoint == other.endpoint - && self.cache == other.cache - && self.http_client == other.http_client - } -} - /// The http client options to the storage. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(default)] diff --git a/src/object-store/src/factory.rs b/src/object-store/src/factory.rs index 010b8be613..2ac38e00ed 100644 --- a/src/object-store/src/factory.rs +++ b/src/object-store/src/factory.rs @@ -14,7 +14,6 @@ use std::{fs, path}; -use common_base::secrets::ExposeSecret; use common_telemetry::info; use opendal::layers::HttpClientLayer; use opendal::services::{Fs, Gcs, Oss, S3}; @@ -32,7 +31,7 @@ pub async fn new_raw_object_store( ) -> Result { let data_home = normalize_dir(data_home); match store { - ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config).await, + ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config), ObjectStoreConfig::S3(s3_config) => new_s3_object_store(s3_config).await, ObjectStoreConfig::Oss(oss_config) => new_oss_object_store(oss_config).await, ObjectStoreConfig::Azblob(azblob_config) => new_azblob_object_store(azblob_config).await, @@ -41,10 +40,7 @@ pub async fn new_raw_object_store( } /// A helper function to create a file system object store. -pub async fn new_fs_object_store( - data_home: &str, - _file_config: &FileConfig, -) -> Result { +pub fn new_fs_object_store(data_home: &str, _file_config: &FileConfig) -> Result { fs::create_dir_all(path::Path::new(&data_home)) .context(error::CreateDirSnafu { dir: data_home })?; info!("The file storage home is: {}", data_home); @@ -68,26 +64,14 @@ pub async fn new_fs_object_store( } pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result { - let root = util::normalize_dir(&azblob_config.root); - + let root = util::normalize_dir(&azblob_config.connection.root); info!( "The azure storage container is: {}, root is: {}", - azblob_config.container, &root + azblob_config.connection.container, &root ); let client = build_http_client(&azblob_config.http_client)?; - - let mut builder = Azblob::default() - .root(&root) - .container(&azblob_config.container) - .endpoint(&azblob_config.endpoint) - .account_name(azblob_config.account_name.expose_secret()) - .account_key(azblob_config.account_key.expose_secret()); - - if let Some(token) = &azblob_config.sas_token { - builder = builder.sas_token(token); - }; - + let builder = Azblob::from(&azblob_config.connection); let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? .layer(HttpClientLayer::new(client)) @@ -97,22 +81,14 @@ pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result Result { - let root = util::normalize_dir(&gcs_config.root); + let root = util::normalize_dir(&gcs_config.connection.root); info!( "The gcs storage bucket is: {}, root is: {}", - gcs_config.bucket, &root + gcs_config.connection.bucket, &root ); let client = build_http_client(&gcs_config.http_client)?; - - let builder = Gcs::default() - .root(&root) - .bucket(&gcs_config.bucket) - .scope(&gcs_config.scope) - .credential_path(gcs_config.credential_path.expose_secret()) - .credential(gcs_config.credential.expose_secret()) - .endpoint(&gcs_config.endpoint); - + let builder = Gcs::from(&gcs_config.connection); let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? .layer(HttpClientLayer::new(client)) @@ -122,21 +98,14 @@ pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result } pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result { - let root = util::normalize_dir(&oss_config.root); + let root = util::normalize_dir(&oss_config.connection.root); info!( "The oss storage bucket is: {}, root is: {}", - oss_config.bucket, &root + oss_config.connection.bucket, &root ); let client = build_http_client(&oss_config.http_client)?; - - let builder = Oss::default() - .root(&root) - .bucket(&oss_config.bucket) - .endpoint(&oss_config.endpoint) - .access_key_id(oss_config.access_key_id.expose_secret()) - .access_key_secret(oss_config.access_key_secret.expose_secret()); - + let builder = Oss::from(&oss_config.connection); let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? .layer(HttpClientLayer::new(client)) @@ -146,31 +115,14 @@ pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result } pub async fn new_s3_object_store(s3_config: &S3Config) -> Result { - let root = util::normalize_dir(&s3_config.root); - + let root = util::normalize_dir(&s3_config.connection.root); info!( "The s3 storage bucket is: {}, root is: {}", - s3_config.bucket, &root + s3_config.connection.bucket, &root ); let client = build_http_client(&s3_config.http_client)?; - - let mut builder = S3::default() - .root(&root) - .bucket(&s3_config.bucket) - .access_key_id(s3_config.access_key_id.expose_secret()) - .secret_access_key(s3_config.secret_access_key.expose_secret()); - - if s3_config.endpoint.is_some() { - builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap()); - } - if s3_config.region.is_some() { - builder = builder.region(s3_config.region.as_ref().unwrap()); - } - if s3_config.enable_virtual_host_style { - builder = builder.enable_virtual_host_style(); - } - + let builder = S3::from(&s3_config.connection); let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? .layer(HttpClientLayer::new(client)) diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index c3cf5e8ce1..e7ee6afcdb 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -27,6 +27,8 @@ mod metrics; pub mod test_util; pub mod util; +pub use config::{AzblobConnection, GcsConnection, OssConnection, S3Connection}; + /// The default object cache directory name. pub const OBJECT_CACHE_DIR: &str = "object_cache"; diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index e26963f1f5..00858eb74a 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -17,7 +17,9 @@ use std::path; use std::time::Duration; use common_telemetry::{debug, error, info, warn}; -use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer}; +use opendal::layers::{ + LoggingInterceptor, LoggingLayer, RetryInterceptor, RetryLayer, TracingLayer, +}; use opendal::raw::{AccessorInfo, HttpClient, Operation}; use opendal::{Error, ErrorKind}; use snafu::ResultExt; @@ -132,6 +134,15 @@ pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> Ob .layer(crate::layers::build_prometheus_metrics_layer(path_label)) } +/// Adds retry layer to the object store. +pub fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { + object_store.layer( + RetryLayer::new() + .with_jitter() + .with_notify(PrintDetailedError), + ) +} + static LOGGING_TARGET: &str = "opendal::services"; struct LoggingContext<'a>(&'a [(&'a str, &'a str)]); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index b483be1c43..2583f18a86 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; -use common_base::secrets::ExposeSecret; use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; @@ -32,12 +31,12 @@ use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, StorageConfig}; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; -use object_store::ObjectStore; use object_store::config::{ AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, }; use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; +use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection}; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig}; @@ -139,11 +138,14 @@ impl StorageType { fn s3_test_config() -> S3Config { S3Config { - root: uuid::Uuid::new_v4().to_string(), - access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap().into(), - secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(), - bucket: env::var("GT_S3_BUCKET").unwrap(), - region: Some(env::var("GT_S3_REGION").unwrap()), + connection: S3Connection { + root: uuid::Uuid::new_v4().to_string(), + access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap().into(), + secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(), + bucket: env::var("GT_S3_BUCKET").unwrap(), + region: Some(env::var("GT_S3_REGION").unwrap()), + ..Default::default() + }, ..Default::default() } } @@ -154,75 +156,55 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te match store_type { StorageType::Gcs => { let gcs_config = GcsConfig { - root: uuid::Uuid::new_v4().to_string(), - bucket: env::var("GT_GCS_BUCKET").unwrap(), - scope: env::var("GT_GCS_SCOPE").unwrap(), - credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(), - credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(), - endpoint: env::var("GT_GCS_ENDPOINT").unwrap(), + connection: GcsConnection { + root: uuid::Uuid::new_v4().to_string(), + bucket: env::var("GT_GCS_BUCKET").unwrap(), + scope: env::var("GT_GCS_SCOPE").unwrap(), + credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(), + credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(), + endpoint: env::var("GT_GCS_ENDPOINT").unwrap(), + }, ..Default::default() }; - let builder = Gcs::default() - .root(&gcs_config.root) - .bucket(&gcs_config.bucket) - .scope(&gcs_config.scope) - .credential_path(gcs_config.credential_path.expose_secret()) - .credential(gcs_config.credential.expose_secret()) - .endpoint(&gcs_config.endpoint); - + let builder = Gcs::from(&gcs_config.connection); let config = ObjectStoreConfig::Gcs(gcs_config); let store = ObjectStore::new(builder).unwrap().finish(); (config, TempDirGuard::Gcs(TempFolder::new(&store, "/"))) } StorageType::Azblob => { let azblob_config = AzblobConfig { - root: uuid::Uuid::new_v4().to_string(), - container: env::var("GT_AZBLOB_CONTAINER").unwrap(), - account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(), - account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(), - endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(), + connection: AzblobConnection { + root: uuid::Uuid::new_v4().to_string(), + container: env::var("GT_AZBLOB_CONTAINER").unwrap(), + account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(), + account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(), + endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(), + ..Default::default() + }, ..Default::default() }; - let mut builder = Azblob::default() - .root(&azblob_config.root) - .endpoint(&azblob_config.endpoint) - .account_name(azblob_config.account_name.expose_secret()) - .account_key(azblob_config.account_key.expose_secret()) - .container(&azblob_config.container); - - if let Ok(sas_token) = env::var("GT_AZBLOB_SAS_TOKEN") { - builder = builder.sas_token(&sas_token); - }; - + let builder = Azblob::from(&azblob_config.connection); let config = ObjectStoreConfig::Azblob(azblob_config); - let store = ObjectStore::new(builder).unwrap().finish(); - (config, TempDirGuard::Azblob(TempFolder::new(&store, "/"))) } StorageType::Oss => { let oss_config = OssConfig { - root: uuid::Uuid::new_v4().to_string(), - access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap().into(), - access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap().into(), - bucket: env::var("GT_OSS_BUCKET").unwrap(), - endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), + connection: OssConnection { + root: uuid::Uuid::new_v4().to_string(), + access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap().into(), + access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap().into(), + bucket: env::var("GT_OSS_BUCKET").unwrap(), + endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), + }, ..Default::default() }; - let builder = Oss::default() - .root(&oss_config.root) - .endpoint(&oss_config.endpoint) - .access_key_id(oss_config.access_key_id.expose_secret()) - .access_key_secret(oss_config.access_key_secret.expose_secret()) - .bucket(&oss_config.bucket); - + let builder = Oss::from(&oss_config.connection); let config = ObjectStoreConfig::Oss(oss_config); - let store = ObjectStore::new(builder).unwrap().finish(); - (config, TempDirGuard::Oss(TempFolder::new(&store, "/"))) } StorageType::S3 | StorageType::S3WithCache => { @@ -235,23 +217,9 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te s3_config.cache.cache_path = Some("".to_string()); } - let mut builder = S3::default() - .root(&s3_config.root) - .access_key_id(s3_config.access_key_id.expose_secret()) - .secret_access_key(s3_config.secret_access_key.expose_secret()) - .bucket(&s3_config.bucket); - - if s3_config.endpoint.is_some() { - builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap()); - }; - if s3_config.region.is_some() { - builder = builder.region(s3_config.region.as_ref().unwrap()); - }; - + let builder = S3::from(&s3_config.connection); let config = ObjectStoreConfig::S3(s3_config); - let store = ObjectStore::new(builder).unwrap().finish(); - (config, TempDirGuard::S3(TempFolder::new(&store, "/"))) } StorageType::File => (ObjectStoreConfig::File(FileConfig {}), TempDirGuard::None),