From 1d07864b298b4f6b6d79f57fd827daf664bd7a14 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 25 Jun 2025 21:49:55 +0800 Subject: [PATCH] refactor(object-store): move backends building functions back to object-store (#6400) refactor/building-backend-in-object-store: ### Refactor Object Store Configuration - **Centralize Object Store Configurations**: Moved object store configurations (`FileConfig`, `S3Config`, `OssConfig`, `AzblobConfig`, `GcsConfig`) to `object-store/src/config.rs`. - **Error Handling Enhancements**: Introduced `object-store/src/error.rs` for improved error handling related to object store operations. - **Factory Pattern for Object Store**: Implemented `object-store/src/factory.rs` to create object store instances, consolidating logic from `datanode/src/store.rs`. - **Remove Redundant Store Implementations**: Deleted individual store files (`azblob.rs`, `fs.rs`, `gcs.rs`, `oss.rs`, `s3.rs`) from `datanode/src/store/`. - **Update Usage of Object Store Config**: Updated references to `ObjectStoreConfig` in `datanode.rs`, `standalone.rs`, `config.rs`, and `error.rs` to use the new centralized configuration. Signed-off-by: Lei, HUANG --- Cargo.lock | 9 + src/cmd/Cargo.toml | 1 + src/cmd/src/datanode.rs | 2 +- src/cmd/src/standalone.rs | 8 +- src/common/config/Cargo.toml | 1 + src/common/config/src/config.rs | 4 +- src/datanode/src/config.rs | 328 +-------------------------- src/datanode/src/error.rs | 28 ++- src/datanode/src/store.rs | 78 ++----- src/datanode/src/store/azblob.rs | 50 ----- src/datanode/src/store/fs.rs | 53 ----- src/datanode/src/store/gcs.rs | 46 ---- src/datanode/src/store/oss.rs | 45 ---- src/datanode/src/store/s3.rs | 55 ----- src/mito2/src/access_layer.rs | 6 +- src/mito2/src/cache/write_cache.rs | 3 +- src/object-store/Cargo.toml | 7 + src/object-store/src/config.rs | 348 +++++++++++++++++++++++++++++ src/object-store/src/error.rs | 72 ++++++ src/object-store/src/factory.rs | 171 ++++++++++++++ src/object-store/src/lib.rs | 8 + src/object-store/src/util.rs | 35 ++- tests-integration/src/cluster.rs | 3 +- tests-integration/src/test_util.rs | 8 +- 24 files changed, 701 insertions(+), 668 deletions(-) delete mode 100644 src/datanode/src/store/azblob.rs delete mode 100644 src/datanode/src/store/fs.rs delete mode 100644 src/datanode/src/store/gcs.rs delete mode 100644 src/datanode/src/store/oss.rs delete mode 100644 src/datanode/src/store/s3.rs create mode 100644 src/object-store/src/config.rs create mode 100644 src/object-store/src/error.rs create mode 100644 src/object-store/src/factory.rs diff --git a/Cargo.lock b/Cargo.lock index 81bd353121..3361f2568d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2118,6 +2118,7 @@ dependencies = [ "mito2", "moka", "nu-ansi-term", + "object-store", "plugins", "prometheus", "prost 0.13.5", @@ -2220,6 +2221,7 @@ dependencies = [ "humantime-serde", "meta-client", "num_cpus", + "object-store", "serde", "serde_json", "serde_with", @@ -8097,14 +8099,21 @@ version = "0.15.0" dependencies = [ "anyhow", "bytes", + "common-base", + "common-error", + "common-macro", "common-telemetry", "common-test-util", "futures", + "humantime-serde", "lazy_static", "md5", "moka", "opendal", "prometheus", + "reqwest 0.12.9", + "serde", + "snafu 0.8.5", "tokio", "uuid", ] diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 5691403973..e5a0f29e22 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -67,6 +67,7 @@ metric-engine.workspace = true mito2.workspace = true moka.workspace = true nu-ansi-term = "0.46" +object-store.workspace = true plugins.workspace = true prometheus.workspace = true prost.workspace = true diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 4063ecf183..a99a531ec8 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -280,7 +280,7 @@ mod tests { use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; - use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config}; + use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config}; use servers::heartbeat_options::HeartbeatOptions; use super::*; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f149e1d2e2..ed9381b3f7 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -848,7 +848,7 @@ mod tests { use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use common_wal::config::DatanodeWalConfig; - use datanode::config::{FileConfig, GcsConfig}; + use object_store::config::{FileConfig, GcsConfig}; use super::*; use crate::options::GlobalOptions; @@ -967,15 +967,15 @@ mod tests { assert!(matches!( &dn_opts.storage.store, - datanode::config::ObjectStoreConfig::File(FileConfig { .. }) + object_store::config::ObjectStoreConfig::File(FileConfig { .. }) )); assert_eq!(dn_opts.storage.providers.len(), 2); assert!(matches!( dn_opts.storage.providers[0], - datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. }) + object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. }) )); match &dn_opts.storage.providers[1] { - datanode::config::ObjectStoreConfig::S3(s3_config) => { + object_store::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "SecretBox([REDACTED])".to_string(), format!("{:?}", s3_config.access_key_id) diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index 988eae44df..17279e5eb7 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -14,6 +14,7 @@ common-macro.workspace = true config.workspace = true humantime-serde.workspace = true num_cpus.workspace = true +object-store.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true diff --git a/src/common/config/src/config.rs b/src/common/config/src/config.rs index 2f85cf3181..5ef7358969 100644 --- a/src/common/config/src/config.rs +++ b/src/common/config/src/config.rs @@ -106,7 +106,7 @@ mod tests { use common_telemetry::logging::LoggingOptions; use common_test_util::temp_dir::create_named_temp_file; use common_wal::config::DatanodeWalConfig; - use datanode::config::{ObjectStoreConfig, StorageConfig}; + use datanode::config::StorageConfig; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; @@ -212,7 +212,7 @@ mod tests { // Check the configs from environment variables. match &opts.storage.store { - ObjectStoreConfig::S3(s3_config) => { + object_store::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!(s3_config.bucket, "mybucket".to_string()); } _ => panic!("unexpected store type"), diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 59709c1285..ba8eb0c7ba 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -14,10 +14,7 @@ //! Datanode configurations -use core::time::Duration; - use common_base::readable_size::ReadableSize; -use common_base::secrets::{ExposeSecret, SecretString}; use common_config::{Configurable, DEFAULT_DATA_HOME}; pub use common_procedure::options::ProcedureConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; @@ -27,6 +24,7 @@ use file_engine::config::EngineConfig as FileEngineConfig; use meta_client::MetaClientOptions; use metric_engine::config::EngineConfig as MetricEngineConfig; use mito2::config::MitoConfig; +pub(crate) use object_store::config::ObjectStoreConfig; use query::options::QueryOptions; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; @@ -36,53 +34,6 @@ use servers::http::HttpOptions; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5); -/// Object storage config -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(tag = "type")] -pub enum ObjectStoreConfig { - File(FileConfig), - S3(S3Config), - Oss(OssConfig), - Azblob(AzblobConfig), - Gcs(GcsConfig), -} - -impl ObjectStoreConfig { - /// Returns the object storage type name, such as `S3`, `Oss` etc. - pub fn provider_name(&self) -> &'static str { - match self { - Self::File(_) => "File", - Self::S3(_) => "S3", - Self::Oss(_) => "Oss", - Self::Azblob(_) => "Azblob", - Self::Gcs(_) => "Gcs", - } - } - - /// Returns true when it's a remote object storage such as AWS s3 etc. - pub fn is_object_storage(&self) -> bool { - !matches!(self, Self::File(_)) - } - - /// Returns the object storage configuration name, return the provider name if it's empty. - pub fn config_name(&self) -> &str { - let name = match self { - // file storage doesn't support name - Self::File(_) => self.provider_name(), - Self::S3(s3) => &s3.name, - Self::Oss(oss) => &oss.name, - Self::Azblob(az) => &az.name, - Self::Gcs(gcs) => &gcs.name, - }; - - if name.trim().is_empty() { - return self.provider_name(); - } - - name - } -} - /// Storage engine config #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(default)] @@ -112,252 +63,6 @@ impl Default for StorageConfig { } } -#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)] -#[serde(default)] -pub struct FileConfig {} - -#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] -#[serde(default)] -pub struct ObjectStorageCacheConfig { - /// The local file cache directory - pub cache_path: Option, - /// The cache capacity in bytes - pub cache_capacity: Option, -} - -/// The http client options to the storage. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(default)] -pub struct HttpClientConfig { - /// The maximum idle connection per host allowed in the pool. - pub(crate) pool_max_idle_per_host: u32, - - /// The timeout for only the connect phase of a http client. - #[serde(with = "humantime_serde")] - pub(crate) connect_timeout: Duration, - - /// The total request timeout, applied from when the request starts connecting until the response body has finished. - /// Also considered a total deadline. - #[serde(with = "humantime_serde")] - pub(crate) timeout: Duration, - - /// The timeout for idle sockets being kept-alive. - #[serde(with = "humantime_serde")] - pub(crate) pool_idle_timeout: Duration, - - /// Skip SSL certificate validation (insecure) - pub skip_ssl_validation: bool, -} - -impl Default for HttpClientConfig { - fn default() -> Self { - Self { - pool_max_idle_per_host: 1024, - connect_timeout: Duration::from_secs(30), - timeout: Duration::from_secs(30), - pool_idle_timeout: Duration::from_secs(90), - skip_ssl_validation: false, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct S3Config { - pub name: String, - pub bucket: String, - pub root: String, - #[serde(skip_serializing)] - pub access_key_id: SecretString, - #[serde(skip_serializing)] - pub secret_access_key: SecretString, - pub endpoint: Option, - pub region: Option, - /// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style. - /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name - /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com - pub enable_virtual_host_style: bool, - #[serde(flatten)] - pub cache: ObjectStorageCacheConfig, - pub http_client: HttpClientConfig, -} - -impl PartialEq for S3Config { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.bucket == other.bucket - && self.root == other.root - && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() - && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret() - && self.endpoint == other.endpoint - && self.region == other.region - && self.enable_virtual_host_style == other.enable_virtual_host_style - && self.cache == other.cache - && self.http_client == other.http_client - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct OssConfig { - pub name: String, - pub bucket: String, - pub root: String, - #[serde(skip_serializing)] - pub access_key_id: SecretString, - #[serde(skip_serializing)] - pub access_key_secret: SecretString, - pub endpoint: String, - #[serde(flatten)] - pub cache: ObjectStorageCacheConfig, - pub http_client: HttpClientConfig, -} - -impl PartialEq for OssConfig { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.bucket == other.bucket - && self.root == other.root - && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() - && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret() - && self.endpoint == other.endpoint - && self.cache == other.cache - && self.http_client == other.http_client - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct AzblobConfig { - pub name: String, - pub container: String, - pub root: String, - #[serde(skip_serializing)] - pub account_name: SecretString, - #[serde(skip_serializing)] - pub account_key: SecretString, - pub endpoint: String, - pub sas_token: Option, - #[serde(flatten)] - pub cache: ObjectStorageCacheConfig, - pub http_client: HttpClientConfig, -} - -impl PartialEq for AzblobConfig { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.container == other.container - && self.root == other.root - && self.account_name.expose_secret() == other.account_name.expose_secret() - && self.account_key.expose_secret() == other.account_key.expose_secret() - && self.endpoint == other.endpoint - && self.sas_token == other.sas_token - && self.cache == other.cache - && self.http_client == other.http_client - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct GcsConfig { - pub name: String, - pub root: String, - pub bucket: String, - pub scope: String, - #[serde(skip_serializing)] - pub credential_path: SecretString, - #[serde(skip_serializing)] - pub credential: SecretString, - pub endpoint: String, - #[serde(flatten)] - pub cache: ObjectStorageCacheConfig, - pub http_client: HttpClientConfig, -} - -impl PartialEq for GcsConfig { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - && self.root == other.root - && self.bucket == other.bucket - && self.scope == other.scope - && self.credential_path.expose_secret() == other.credential_path.expose_secret() - && self.credential.expose_secret() == other.credential.expose_secret() - && self.endpoint == other.endpoint - && self.cache == other.cache - && self.http_client == other.http_client - } -} - -impl Default for S3Config { - fn default() -> Self { - Self { - name: String::default(), - bucket: String::default(), - root: String::default(), - access_key_id: SecretString::from(String::default()), - secret_access_key: SecretString::from(String::default()), - enable_virtual_host_style: false, - endpoint: Option::default(), - region: Option::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -impl Default for OssConfig { - fn default() -> Self { - Self { - name: String::default(), - bucket: String::default(), - root: String::default(), - access_key_id: SecretString::from(String::default()), - access_key_secret: SecretString::from(String::default()), - endpoint: String::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -impl Default for AzblobConfig { - fn default() -> Self { - Self { - name: String::default(), - container: String::default(), - root: String::default(), - account_name: SecretString::from(String::default()), - account_key: SecretString::from(String::default()), - endpoint: String::default(), - sas_token: Option::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -impl Default for GcsConfig { - fn default() -> Self { - Self { - name: String::default(), - root: String::default(), - bucket: String::default(), - scope: String::default(), - credential_path: SecretString::from(String::default()), - credential: SecretString::from(String::default()), - endpoint: String::default(), - cache: ObjectStorageCacheConfig::default(), - http_client: HttpClientConfig::default(), - } - } -} - -impl Default for ObjectStoreConfig { - fn default() -> Self { - ObjectStoreConfig::File(FileConfig {}) - } -} - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct DatanodeOptions { @@ -467,37 +172,6 @@ mod tests { let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap(); } - #[test] - fn test_config_name() { - let object_store_config = ObjectStoreConfig::default(); - assert_eq!("File", object_store_config.config_name()); - - let s3_config = ObjectStoreConfig::S3(S3Config::default()); - assert_eq!("S3", s3_config.config_name()); - assert_eq!("S3", s3_config.provider_name()); - - let s3_config = ObjectStoreConfig::S3(S3Config { - name: "test".to_string(), - ..Default::default() - }); - assert_eq!("test", s3_config.config_name()); - assert_eq!("S3", s3_config.provider_name()); - } - - #[test] - fn test_is_object_storage() { - let store = ObjectStoreConfig::default(); - assert!(!store.is_object_storage()); - let s3_config = ObjectStoreConfig::S3(S3Config::default()); - assert!(s3_config.is_object_storage()); - let oss_config = ObjectStoreConfig::Oss(OssConfig::default()); - assert!(oss_config.is_object_storage()); - let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default()); - assert!(gcs_config.is_object_storage()); - let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default()); - assert!(azblob_config.is_object_storage()); - } - #[test] fn test_secstr() { let toml_str = r#" diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 4914100b80..b77eea293f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -142,14 +142,6 @@ pub enum Error { source: Box, }, - #[snafu(display("Failed to init backend"))] - InitBackend { - #[snafu(source)] - error: object_store::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid SQL, error: {}", msg))] InvalidSql { msg: String }, @@ -395,6 +387,21 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed object store operation"))] + ObjectStore { + source: object_store::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build cache store"))] + BuildCacheStore { + #[snafu(source)] + error: object_store::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -447,8 +454,6 @@ impl ErrorExt for Error { StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(), - InitBackend { .. } => StatusCode::StorageUnavailable, - OpenLogStore { source, .. } => source.status_code(), MetaClientInit { source, .. } => source.status_code(), UnsupportedOutput { .. } => StatusCode::Unsupported, @@ -466,6 +471,9 @@ impl ErrorExt for Error { } MissingCache { .. } => StatusCode::Internal, SerializeJson { .. } => StatusCode::Internal, + + ObjectStore { source, .. } => source.status_code(), + BuildCacheStore { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 7e80259e66..c6a0fc0b30 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -14,45 +14,22 @@ //! object storage utilities -mod azblob; -pub mod fs; -mod gcs; -mod oss; -mod s3; -use std::path; use std::path::Path; use std::sync::Arc; use std::time::Duration; use common_telemetry::{info, warn}; -use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR}; +use object_store::factory::new_raw_object_store; use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; use object_store::services::Fs; -use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder}; +use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers}; +use object_store::{ + Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR, +}; use snafu::prelude::*; -use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; -use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result}; - -pub(crate) async fn new_raw_object_store( - store: &ObjectStoreConfig, - data_home: &str, -) -> Result { - let data_home = normalize_dir(data_home); - let object_store = match store { - ObjectStoreConfig::File(file_config) => { - fs::new_fs_object_store(&data_home, file_config).await - } - ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await, - ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await, - ObjectStoreConfig::Azblob(azblob_config) => { - azblob::new_azblob_object_store(azblob_config).await - } - ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await, - }?; - Ok(object_store) -} +use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::error::{self, CreateDirSnafu, Result}; fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { object_store.layer( @@ -66,7 +43,9 @@ pub(crate) async fn new_object_store_without_cache( store: &ObjectStoreConfig, data_home: &str, ) -> Result { - let object_store = new_raw_object_store(store, data_home).await?; + let object_store = new_raw_object_store(store, data_home) + .await + .context(error::ObjectStoreSnafu)?; // Enable retry layer and cache layer for non-fs object storages let object_store = if store.is_object_storage() { // Adds retry layer @@ -83,7 +62,9 @@ pub(crate) async fn new_object_store( store: ObjectStoreConfig, data_home: &str, ) -> Result { - let object_store = new_raw_object_store(&store, data_home).await?; + let object_store = new_raw_object_store(&store, data_home) + .await + .context(error::ObjectStoreSnafu)?; // Enable retry layer and cache layer for non-fs object storages let object_store = if store.is_object_storage() { let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? { @@ -170,20 +151,20 @@ async fn build_cache_layer( && !path.trim().is_empty() { let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR); - clean_temp_dir(&atomic_temp_dir)?; + clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; // Compatible code. Remove this after a major release. let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR); - clean_temp_dir(&old_atomic_temp_dir)?; + clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?; let cache_store = Fs::default() .root(path) .atomic_write_dir(&atomic_temp_dir) .build() - .context(error::InitBackendSnafu)?; + .context(error::BuildCacheStoreSnafu)?; let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) - .context(error::InitBackendSnafu)?; + .context(error::BuildCacheStoreSnafu)?; cache_layer.recover_cache(false).await; info!( "Enabled local object storage cache, path: {}, capacity: {}.", @@ -196,31 +177,6 @@ async fn build_cache_layer( } } -pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> { - if path::Path::new(&dir).exists() { - info!("Begin to clean temp storage directory: {}", dir); - std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?; - info!("Cleaned temp storage directory: {}", dir); - } - - Ok(()) -} - -pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result { - if config.skip_ssl_validation { - common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted."); - } - - let client = reqwest::ClientBuilder::new() - .pool_max_idle_per_host(config.pool_max_idle_per_host as usize) - .connect_timeout(config.connect_timeout) - .pool_idle_timeout(config.pool_idle_timeout) - .timeout(config.timeout) - .danger_accept_invalid_certs(config.skip_ssl_validation) - .build() - .context(BuildHttpClientSnafu)?; - Ok(HttpClient::with(client)) -} struct PrintDetailedError; // PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs deleted file mode 100644 index bf7b885de9..0000000000 --- a/src/datanode/src/store/azblob.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_base::secrets::ExposeSecret; -use common_telemetry::info; -use object_store::services::Azblob; -use object_store::{util, ObjectStore}; -use snafu::prelude::*; - -use crate::config::AzblobConfig; -use crate::error::{self, Result}; -use crate::store::build_http_client; - -pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result { - let root = util::normalize_dir(&azblob_config.root); - - info!( - "The azure storage container is: {}, root is: {}", - azblob_config.container, &root - ); - - let client = build_http_client(&azblob_config.http_client)?; - - let mut builder = Azblob::default() - .root(&root) - .container(&azblob_config.container) - .endpoint(&azblob_config.endpoint) - .account_name(azblob_config.account_name.expose_secret()) - .account_key(azblob_config.account_key.expose_secret()) - .http_client(client); - - if let Some(token) = &azblob_config.sas_token { - builder = builder.sas_token(token); - }; - - Ok(ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish()) -} diff --git a/src/datanode/src/store/fs.rs b/src/datanode/src/store/fs.rs deleted file mode 100644 index 75e753d4ed..0000000000 --- a/src/datanode/src/store/fs.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{fs, path}; - -use common_telemetry::info; -use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR}; -use object_store::services::Fs; -use object_store::util::join_dir; -use object_store::ObjectStore; -use snafu::prelude::*; - -use crate::config::FileConfig; -use crate::error::{self, Result}; -use crate::store; - -/// A helper function to create a file system object store. -pub async fn new_fs_object_store( - data_home: &str, - _file_config: &FileConfig, -) -> Result { - fs::create_dir_all(path::Path::new(&data_home)) - .context(error::CreateDirSnafu { dir: data_home })?; - info!("The file storage home is: {}", data_home); - - let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR); - store::clean_temp_dir(&atomic_write_dir)?; - - // Compatible code. Remove this after a major release. - let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR); - store::clean_temp_dir(&old_atomic_temp_dir)?; - - let builder = Fs::default() - .root(data_home) - .atomic_write_dir(&atomic_write_dir); - - let object_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(); - - Ok(object_store) -} diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs deleted file mode 100644 index 60d25d7c7e..0000000000 --- a/src/datanode/src/store/gcs.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_base::secrets::ExposeSecret; -use common_telemetry::info; -use object_store::services::Gcs; -use object_store::{util, ObjectStore}; -use snafu::prelude::*; - -use crate::config::GcsConfig; -use crate::error::{self, Result}; -use crate::store::build_http_client; - -pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result { - let root = util::normalize_dir(&gcs_config.root); - info!( - "The gcs storage bucket is: {}, root is: {}", - gcs_config.bucket, &root - ); - - let client = build_http_client(&gcs_config.http_client); - - let builder = Gcs::default() - .root(&root) - .bucket(&gcs_config.bucket) - .scope(&gcs_config.scope) - .credential_path(gcs_config.credential_path.expose_secret()) - .credential(gcs_config.credential.expose_secret()) - .endpoint(&gcs_config.endpoint) - .http_client(client?); - - Ok(ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish()) -} diff --git a/src/datanode/src/store/oss.rs b/src/datanode/src/store/oss.rs deleted file mode 100644 index 3eab6acfd7..0000000000 --- a/src/datanode/src/store/oss.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_base::secrets::ExposeSecret; -use common_telemetry::info; -use object_store::services::Oss; -use object_store::{util, ObjectStore}; -use snafu::prelude::*; - -use crate::config::OssConfig; -use crate::error::{self, Result}; -use crate::store::build_http_client; - -pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result { - let root = util::normalize_dir(&oss_config.root); - info!( - "The oss storage bucket is: {}, root is: {}", - oss_config.bucket, &root - ); - - let client = build_http_client(&oss_config.http_client)?; - - let builder = Oss::default() - .root(&root) - .bucket(&oss_config.bucket) - .endpoint(&oss_config.endpoint) - .access_key_id(oss_config.access_key_id.expose_secret()) - .access_key_secret(oss_config.access_key_secret.expose_secret()) - .http_client(client); - - Ok(ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish()) -} diff --git a/src/datanode/src/store/s3.rs b/src/datanode/src/store/s3.rs deleted file mode 100644 index 0e51a071ae..0000000000 --- a/src/datanode/src/store/s3.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_base::secrets::ExposeSecret; -use common_telemetry::info; -use object_store::services::S3; -use object_store::{util, ObjectStore}; -use snafu::prelude::*; - -use crate::config::S3Config; -use crate::error::{self, Result}; -use crate::store::build_http_client; - -pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result { - let root = util::normalize_dir(&s3_config.root); - - info!( - "The s3 storage bucket is: {}, root is: {}", - s3_config.bucket, &root - ); - - let client = build_http_client(&s3_config.http_client)?; - - let mut builder = S3::default() - .root(&root) - .bucket(&s3_config.bucket) - .access_key_id(s3_config.access_key_id.expose_secret()) - .secret_access_key(s3_config.secret_access_key.expose_secret()) - .http_client(client); - - if s3_config.endpoint.is_some() { - builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap()); - } - if s3_config.region.is_some() { - builder = builder.region(s3_config.region.as_ref().unwrap()); - } - if s3_config.enable_virtual_host_style { - builder = builder.enable_virtual_host_style(); - } - - Ok(ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish()) -} diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ae0091c7c2..49e4ffa705 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; -use object_store::{ErrorKind, ObjectStore}; +use object_store::{ErrorKind, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR}; use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; @@ -42,10 +42,6 @@ pub type AccessLayerRef = Arc; /// SST write results. pub type SstInfoArray = SmallVec<[SstInfo; 2]>; -pub const ATOMIC_WRITE_DIR: &str = "tmp/"; -/// For compatibility. Remove this after a major version release. -pub const OLD_ATOMIC_WRITE_DIR: &str = ".tmp/"; - /// A layer to access SST files under the same directory. pub struct AccessLayer { region_dir: String, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 458acc968f..c17473febd 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -430,9 +430,10 @@ impl UploadTracker { #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; + use object_store::ATOMIC_WRITE_DIR; use super::*; - use crate::access_layer::{OperationType, ATOMIC_WRITE_DIR}; + use crate::access_layer::OperationType; use crate::cache::test_util::new_fs_store; use crate::cache::{CacheManager, CacheStrategy}; use crate::error::InvalidBatchSnafu; diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index f90ea42d61..5ffaf07b1e 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -12,8 +12,12 @@ services-memory = ["opendal/services-memory"] [dependencies] bytes.workspace = true +common-base.workspace = true +common-error.workspace = true +common-macro.workspace = true common-telemetry.workspace = true futures.workspace = true +humantime-serde.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } @@ -28,6 +32,9 @@ opendal = { version = "0.52", features = [ "services-s3", ] } prometheus.workspace = true +reqwest.workspace = true +serde.workspace = true +snafu.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/object-store/src/config.rs b/src/object-store/src/config.rs new file mode 100644 index 0000000000..c692447c3a --- /dev/null +++ b/src/object-store/src/config.rs @@ -0,0 +1,348 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use common_base::secrets::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; + +/// Object storage config +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type")] +pub enum ObjectStoreConfig { + File(FileConfig), + S3(S3Config), + Oss(OssConfig), + Azblob(AzblobConfig), + Gcs(GcsConfig), +} + +impl Default for ObjectStoreConfig { + fn default() -> Self { + ObjectStoreConfig::File(FileConfig {}) + } +} + +impl ObjectStoreConfig { + /// Returns the object storage type name, such as `S3`, `Oss` etc. + pub fn provider_name(&self) -> &'static str { + match self { + Self::File(_) => "File", + Self::S3(_) => "S3", + Self::Oss(_) => "Oss", + Self::Azblob(_) => "Azblob", + Self::Gcs(_) => "Gcs", + } + } + + /// Returns true when it's a remote object storage such as AWS s3 etc. + pub fn is_object_storage(&self) -> bool { + !matches!(self, Self::File(_)) + } + + /// Returns the object storage configuration name, return the provider name if it's empty. + pub fn config_name(&self) -> &str { + let name = match self { + // file storage doesn't support name + Self::File(_) => self.provider_name(), + Self::S3(s3) => &s3.name, + Self::Oss(oss) => &oss.name, + Self::Azblob(az) => &az.name, + Self::Gcs(gcs) => &gcs.name, + }; + + if name.trim().is_empty() { + return self.provider_name(); + } + + name + } +} + +#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)] +#[serde(default)] +pub struct FileConfig {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct S3Config { + pub name: String, + pub bucket: String, + pub root: String, + #[serde(skip_serializing)] + pub access_key_id: SecretString, + #[serde(skip_serializing)] + pub secret_access_key: SecretString, + pub endpoint: Option, + pub region: Option, + /// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style. + /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name + /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com + pub enable_virtual_host_style: bool, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, + pub http_client: HttpClientConfig, +} + +impl Default for S3Config { + fn default() -> Self { + Self { + name: String::default(), + bucket: String::default(), + root: String::default(), + access_key_id: SecretString::from(String::default()), + secret_access_key: SecretString::from(String::default()), + enable_virtual_host_style: false, + endpoint: Option::default(), + region: Option::default(), + cache: ObjectStorageCacheConfig::default(), + http_client: HttpClientConfig::default(), + } + } +} + +impl PartialEq for S3Config { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.bucket == other.bucket + && self.root == other.root + && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() + && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret() + && self.endpoint == other.endpoint + && self.region == other.region + && self.enable_virtual_host_style == other.enable_virtual_host_style + && self.cache == other.cache + && self.http_client == other.http_client + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct OssConfig { + pub name: String, + pub bucket: String, + pub root: String, + #[serde(skip_serializing)] + pub access_key_id: SecretString, + #[serde(skip_serializing)] + pub access_key_secret: SecretString, + pub endpoint: String, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, + pub http_client: HttpClientConfig, +} + +impl PartialEq for OssConfig { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.bucket == other.bucket + && self.root == other.root + && self.access_key_id.expose_secret() == other.access_key_id.expose_secret() + && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret() + && self.endpoint == other.endpoint + && self.cache == other.cache + && self.http_client == other.http_client + } +} + +impl Default for OssConfig { + fn default() -> Self { + Self { + name: String::default(), + bucket: String::default(), + root: String::default(), + access_key_id: SecretString::from(String::default()), + access_key_secret: SecretString::from(String::default()), + endpoint: String::default(), + cache: ObjectStorageCacheConfig::default(), + http_client: HttpClientConfig::default(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AzblobConfig { + pub name: String, + pub container: String, + pub root: String, + #[serde(skip_serializing)] + pub account_name: SecretString, + #[serde(skip_serializing)] + pub account_key: SecretString, + pub endpoint: String, + pub sas_token: Option, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, + pub http_client: HttpClientConfig, +} + +impl PartialEq for AzblobConfig { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.container == other.container + && self.root == other.root + && self.account_name.expose_secret() == other.account_name.expose_secret() + && self.account_key.expose_secret() == other.account_key.expose_secret() + && self.endpoint == other.endpoint + && self.sas_token == other.sas_token + && self.cache == other.cache + && self.http_client == other.http_client + } +} +impl Default for AzblobConfig { + fn default() -> Self { + Self { + name: String::default(), + container: String::default(), + root: String::default(), + account_name: SecretString::from(String::default()), + account_key: SecretString::from(String::default()), + endpoint: String::default(), + sas_token: Option::default(), + cache: ObjectStorageCacheConfig::default(), + http_client: HttpClientConfig::default(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct GcsConfig { + pub name: String, + pub root: String, + pub bucket: String, + pub scope: String, + #[serde(skip_serializing)] + pub credential_path: SecretString, + #[serde(skip_serializing)] + pub credential: SecretString, + pub endpoint: String, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, + pub http_client: HttpClientConfig, +} + +impl Default for GcsConfig { + fn default() -> Self { + Self { + name: String::default(), + root: String::default(), + bucket: String::default(), + scope: String::default(), + credential_path: SecretString::from(String::default()), + credential: SecretString::from(String::default()), + endpoint: String::default(), + cache: ObjectStorageCacheConfig::default(), + http_client: HttpClientConfig::default(), + } + } +} + +impl PartialEq for GcsConfig { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.root == other.root + && self.bucket == other.bucket + && self.scope == other.scope + && self.credential_path.expose_secret() == other.credential_path.expose_secret() + && self.credential.expose_secret() == other.credential.expose_secret() + && self.endpoint == other.endpoint + && self.cache == other.cache + && self.http_client == other.http_client + } +} + +/// The http client options to the storage. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(default)] +pub struct HttpClientConfig { + /// The maximum idle connection per host allowed in the pool. + pub(crate) pool_max_idle_per_host: u32, + + /// The timeout for only the connect phase of a http client. + #[serde(with = "humantime_serde")] + pub(crate) connect_timeout: Duration, + + /// The total request timeout, applied from when the request starts connecting until the response body has finished. + /// Also considered a total deadline. + #[serde(with = "humantime_serde")] + pub(crate) timeout: Duration, + + /// The timeout for idle sockets being kept-alive. + #[serde(with = "humantime_serde")] + pub(crate) pool_idle_timeout: Duration, + + /// Skip SSL certificate validation (insecure) + pub skip_ssl_validation: bool, +} + +impl Default for HttpClientConfig { + fn default() -> Self { + Self { + pool_max_idle_per_host: 1024, + connect_timeout: Duration::from_secs(30), + timeout: Duration::from_secs(30), + pool_idle_timeout: Duration::from_secs(90), + skip_ssl_validation: false, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] +#[serde(default)] +pub struct ObjectStorageCacheConfig { + /// The local file cache directory + pub cache_path: Option, + /// The cache capacity in bytes + pub cache_capacity: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::ObjectStoreConfig; + + #[test] + fn test_config_name() { + let object_store_config = ObjectStoreConfig::default(); + assert_eq!("File", object_store_config.config_name()); + + let s3_config = ObjectStoreConfig::S3(S3Config::default()); + assert_eq!("S3", s3_config.config_name()); + assert_eq!("S3", s3_config.provider_name()); + + let s3_config = ObjectStoreConfig::S3(S3Config { + name: "test".to_string(), + ..Default::default() + }); + assert_eq!("test", s3_config.config_name()); + assert_eq!("S3", s3_config.provider_name()); + } + + #[test] + fn test_is_object_storage() { + let store = ObjectStoreConfig::default(); + assert!(!store.is_object_storage()); + let s3_config = ObjectStoreConfig::S3(S3Config::default()); + assert!(s3_config.is_object_storage()); + let oss_config = ObjectStoreConfig::Oss(OssConfig::default()); + assert!(oss_config.is_object_storage()); + let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default()); + assert!(gcs_config.is_object_storage()); + let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default()); + assert!(azblob_config.is_object_storage()); + } +} diff --git a/src/object-store/src/error.rs b/src/object-store/src/error.rs new file mode 100644 index 0000000000..cb259ef5c8 --- /dev/null +++ b/src/object-store/src/error.rs @@ -0,0 +1,72 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_macro::stack_trace_debug; +use common_telemetry::common_error::ext::ErrorExt; +use common_telemetry::common_error::status_code::StatusCode; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to init backend"))] + InitBackend { + #[snafu(source)] + error: opendal::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build http client"))] + BuildHttpClient { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: reqwest::Error, + }, + + #[snafu(display("Failed to create directory {}", dir))] + CreateDir { + dir: String, + #[snafu(source)] + error: std::io::Error, + }, + + #[snafu(display("Failed to remove directory {}", dir))] + RemoveDir { + dir: String, + #[snafu(source)] + error: std::io::Error, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + InitBackend { .. } => StatusCode::StorageUnavailable, + BuildHttpClient { .. } => StatusCode::Unexpected, + CreateDir { .. } | RemoveDir { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/object-store/src/factory.rs b/src/object-store/src/factory.rs new file mode 100644 index 0000000000..29b52c4951 --- /dev/null +++ b/src/object-store/src/factory.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{fs, path}; + +use common_base::secrets::ExposeSecret; +use common_telemetry::info; +use opendal::services::{Fs, Gcs, Oss, S3}; +use snafu::prelude::*; + +use crate::config::{AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config}; +use crate::error::{self, Result}; +use crate::services::Azblob; +use crate::util::{build_http_client, clean_temp_dir, join_dir, normalize_dir}; +use crate::{util, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR}; + +pub async fn new_raw_object_store( + store: &ObjectStoreConfig, + data_home: &str, +) -> Result { + let data_home = normalize_dir(data_home); + match store { + ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config).await, + ObjectStoreConfig::S3(s3_config) => new_s3_object_store(s3_config).await, + ObjectStoreConfig::Oss(oss_config) => new_oss_object_store(oss_config).await, + ObjectStoreConfig::Azblob(azblob_config) => new_azblob_object_store(azblob_config).await, + ObjectStoreConfig::Gcs(gcs_config) => new_gcs_object_store(gcs_config).await, + } +} + +/// A helper function to create a file system object store. +pub async fn new_fs_object_store( + data_home: &str, + _file_config: &FileConfig, +) -> Result { + fs::create_dir_all(path::Path::new(&data_home)) + .context(error::CreateDirSnafu { dir: data_home })?; + info!("The file storage home is: {}", data_home); + + let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR); + clean_temp_dir(&atomic_write_dir)?; + + // Compatible code. Remove this after a major release. + let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR); + clean_temp_dir(&old_atomic_temp_dir)?; + + let builder = Fs::default() + .root(data_home) + .atomic_write_dir(&atomic_write_dir); + + let object_store = ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish(); + + Ok(object_store) +} + +pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result { + let root = util::normalize_dir(&azblob_config.root); + + info!( + "The azure storage container is: {}, root is: {}", + azblob_config.container, &root + ); + + let client = build_http_client(&azblob_config.http_client)?; + + let mut builder = Azblob::default() + .root(&root) + .container(&azblob_config.container) + .endpoint(&azblob_config.endpoint) + .account_name(azblob_config.account_name.expose_secret()) + .account_key(azblob_config.account_key.expose_secret()) + .http_client(client); + + if let Some(token) = &azblob_config.sas_token { + builder = builder.sas_token(token); + }; + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} + +pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result { + let root = util::normalize_dir(&gcs_config.root); + info!( + "The gcs storage bucket is: {}, root is: {}", + gcs_config.bucket, &root + ); + + let client = build_http_client(&gcs_config.http_client); + + let builder = Gcs::default() + .root(&root) + .bucket(&gcs_config.bucket) + .scope(&gcs_config.scope) + .credential_path(gcs_config.credential_path.expose_secret()) + .credential(gcs_config.credential.expose_secret()) + .endpoint(&gcs_config.endpoint) + .http_client(client?); + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} + +pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result { + let root = util::normalize_dir(&oss_config.root); + info!( + "The oss storage bucket is: {}, root is: {}", + oss_config.bucket, &root + ); + + let client = build_http_client(&oss_config.http_client)?; + + let builder = Oss::default() + .root(&root) + .bucket(&oss_config.bucket) + .endpoint(&oss_config.endpoint) + .access_key_id(oss_config.access_key_id.expose_secret()) + .access_key_secret(oss_config.access_key_secret.expose_secret()) + .http_client(client); + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} + +pub async fn new_s3_object_store(s3_config: &S3Config) -> Result { + let root = util::normalize_dir(&s3_config.root); + + info!( + "The s3 storage bucket is: {}, root is: {}", + s3_config.bucket, &root + ); + + let client = build_http_client(&s3_config.http_client)?; + + let mut builder = S3::default() + .root(&root) + .bucket(&s3_config.bucket) + .access_key_id(s3_config.access_key_id.expose_secret()) + .secret_access_key(s3_config.secret_access_key.expose_secret()) + .http_client(client); + + if s3_config.endpoint.is_some() { + builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap()); + } + if s3_config.region.is_some() { + builder = builder.region(s3_config.region.as_ref().unwrap()); + } + if s3_config.enable_virtual_host_style { + builder = builder.enable_virtual_host_style(); + } + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 2e27822318..1e15d1437b 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -19,10 +19,18 @@ pub use opendal::{ Writer, }; +pub mod config; +pub mod error; +pub mod factory; pub mod layers; pub mod manager; mod metrics; pub mod test_util; pub mod util; + /// The default object cache directory name. pub const OBJECT_CACHE_DIR: &str = "object_cache"; + +pub const ATOMIC_WRITE_DIR: &str = "tmp/"; +/// For compatibility. Remove this after a major version release. +pub const OLD_ATOMIC_WRITE_DIR: &str = ".tmp/"; diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 271da33e85..c4a13d72e9 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -13,13 +13,16 @@ // limitations under the License. use std::fmt::Display; +use std::path; -use common_telemetry::{debug, error, trace}; +use common_telemetry::{debug, error, info, trace}; use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer}; -use opendal::raw::{AccessorInfo, Operation}; +use opendal::raw::{AccessorInfo, HttpClient, Operation}; use opendal::ErrorKind; +use snafu::ResultExt; -use crate::ObjectStore; +use crate::config::HttpClientConfig; +use crate::{error, ObjectStore}; /// Join two paths and normalize the output dir. /// @@ -200,6 +203,32 @@ impl LoggingInterceptor for DefaultLoggingInterceptor { } } +pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result { + if config.skip_ssl_validation { + common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted."); + } + + let client = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(config.pool_max_idle_per_host as usize) + .connect_timeout(config.connect_timeout) + .pool_idle_timeout(config.pool_idle_timeout) + .timeout(config.timeout) + .danger_accept_invalid_certs(config.skip_ssl_validation) + .build() + .context(error::BuildHttpClientSnafu)?; + Ok(HttpClient::with(client)) +} + +pub fn clean_temp_dir(dir: &str) -> error::Result<()> { + if path::Path::new(&dir).exists() { + info!("Begin to clean temp storage directory: {}", dir); + std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?; + info!("Cleaned temp storage directory: {}", dir); + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index f01944743c..c76b31f794 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -44,7 +44,7 @@ use common_runtime::runtime::BuilderBuild; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; -use datanode::config::{DatanodeOptions, ObjectStoreConfig}; +use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::heartbeat::HeartbeatTask; @@ -55,6 +55,7 @@ use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; +use object_store::config::ObjectStoreConfig; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; use servers::grpc::GrpcOptions; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 8cc0932240..1f4c47903d 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -30,13 +30,13 @@ use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use common_wal::config::DatanodeWalConfig; -use datanode::config::{ - AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, - StorageConfig, -}; +use datanode::config::{DatanodeOptions, StorageConfig}; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; use futures::future::BoxFuture; +use object_store::config::{ + AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, +}; use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore;