From f9f4ac1dca00ebd56a4828dfbc9001b9d7e89f88 Mon Sep 17 00:00:00 2001 From: Logic Date: Sat, 7 Jun 2025 23:39:33 +0800 Subject: [PATCH] feat: Support export cli export to OSS (#6225) * feat(object_store): add support for Alibaba Cloud OSS - Implement OSS backend in object_store module - Add OSS-related options to ExportCommand - Update build_operator to support OSS - Modify parse_url to handle OSS schema Signed-off-by: Logic * feat(object_store): add support for Alibaba Cloud OSS - Implement OSS backend in object_store module - Add OSS-related options to ExportCommand - Update build_operator to support OSS - Modify parse_url to handle OSS schema Signed-off-by: Logic * test(object_store): update OSS backend tests with comprehensive scenarios - Remove minimal case test for OSS backend - Update test for OSS backend with all fields valid- Remove invalid allow_anonymous test case Signed-off-by: Logic * feat(datasource): add support for OSS (Object Storage Service) - Implement is_supported_in_oss function to check if a key is supported in OSS configuration- Add build_oss_backend function for creating an OSS backend - Update requests module to include OSS support check Signed-off-by: Logic * refactor(export): enhance security and logging for sensitive data - Replace plain strings with SecretString for sensitive information- Implement masking of sensitive data in SQL logs - Update handling of S3 and OSS credentials Signed-off-by: Logic * refactor(export): generalize remote storage support and rename options - Rename `s3_ddl_local_dir` to `ddl_local_dir` for better clarity - Update comments to support both S3 and OSS remote storage options - Modify logic to handle remote storage options more generically Signed-off-by: Logic * refactor(export): generalize remote storage support and rename options - Rename `s3_ddl_local_dir` to `ddl_local_dir` for better clarity - Update comments to support both S3 and OSS remote storage options - Modify logic to handle remote storage options more generically Signed-off-by: Logic --------- Signed-off-by: Logic --- src/cli/src/export.rs | 186 +++++++++++++++--- src/common/datasource/src/object_store.rs | 10 + src/common/datasource/src/object_store/oss.rs | 118 +++++++++++ src/table/src/requests.rs | 5 + 4 files changed, 293 insertions(+), 26 deletions(-) create mode 100644 src/common/datasource/src/object_store/oss.rs diff --git a/src/cli/src/export.rs b/src/cli/src/export.rs index 3b9b3283ef..2bc7c33a92 100644 --- a/src/cli/src/export.rs +++ b/src/cli/src/export.rs @@ -19,9 +19,11 @@ use std::time::Duration; use async_trait::async_trait; use clap::{Parser, ValueEnum}; +use common_base::secrets::{ExposeSecret, SecretString}; use common_error::ext::BoxedError; use common_telemetry::{debug, error, info}; use object_store::layers::LoggingLayer; +use object_store::services::Oss; use object_store::{services, ObjectStore}; use serde_json::Value; use snafu::{OptionExt, ResultExt}; @@ -110,15 +112,15 @@ pub struct ExportCommand { #[clap(long)] s3: bool, - /// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for - /// exported SQL files, and the data will be exported to s3. + /// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for + /// exported SQL files, and the data will be exported to remote storage. /// - /// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have - /// direct access to s3. + /// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have + /// direct access to remote storage. /// - /// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3. + /// if remote storage is set but `ddl_local_dir` is not set, both SQL&data will be exported to remote storage. #[clap(long)] - s3_ddl_local_dir: Option, + ddl_local_dir: Option, /// The s3 bucket name /// if s3 is set, this is required @@ -149,6 +151,30 @@ pub struct ExportCommand { /// if s3 is set, this is required #[clap(long)] s3_region: Option, + + /// if export data to oss + #[clap(long)] + oss: bool, + + /// The oss bucket name + /// if oss is set, this is required + #[clap(long)] + oss_bucket: Option, + + /// The oss endpoint + /// if oss is set, this is required + #[clap(long)] + oss_endpoint: Option, + + /// The oss access key id + /// if oss is set, this is required + #[clap(long)] + oss_access_key_id: Option, + + /// The oss access key secret + /// if oss is set, this is required + #[clap(long)] + oss_access_key_secret: Option, } impl ExportCommand { @@ -162,7 +188,7 @@ impl ExportCommand { { return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build())); } - if !self.s3 && self.output_dir.is_none() { + if !self.s3 && !self.oss && self.output_dir.is_none() { return Err(BoxedError::new(OutputDirNotSetSnafu {}.build())); } let (catalog, schema) = @@ -187,13 +213,32 @@ impl ExportCommand { start_time: self.start_time.clone(), end_time: self.end_time.clone(), s3: self.s3, - s3_ddl_local_dir: self.s3_ddl_local_dir.clone(), + ddl_local_dir: self.ddl_local_dir.clone(), s3_bucket: self.s3_bucket.clone(), s3_root: self.s3_root.clone(), s3_endpoint: self.s3_endpoint.clone(), - s3_access_key: self.s3_access_key.clone(), - s3_secret_key: self.s3_secret_key.clone(), + // Wrap sensitive values in SecretString + s3_access_key: self + .s3_access_key + .as_ref() + .map(|k| SecretString::from(k.clone())), + s3_secret_key: self + .s3_secret_key + .as_ref() + .map(|k| SecretString::from(k.clone())), s3_region: self.s3_region.clone(), + oss: self.oss, + oss_bucket: self.oss_bucket.clone(), + oss_endpoint: self.oss_endpoint.clone(), + // Wrap sensitive values in SecretString + oss_access_key_id: self + .oss_access_key_id + .as_ref() + .map(|k| SecretString::from(k.clone())), + oss_access_key_secret: self + .oss_access_key_secret + .as_ref() + .map(|k| SecretString::from(k.clone())), })) } } @@ -209,23 +254,30 @@ pub struct Export { start_time: Option, end_time: Option, s3: bool, - s3_ddl_local_dir: Option, + ddl_local_dir: Option, s3_bucket: Option, s3_root: Option, s3_endpoint: Option, - s3_access_key: Option, - s3_secret_key: Option, + // Changed to SecretString for sensitive data + s3_access_key: Option, + s3_secret_key: Option, s3_region: Option, + oss: bool, + oss_bucket: Option, + oss_endpoint: Option, + // Changed to SecretString for sensitive data + oss_access_key_id: Option, + oss_access_key_secret: Option, } impl Export { fn catalog_path(&self) -> PathBuf { - if self.s3 { + if self.s3 || self.oss { PathBuf::from(&self.catalog) } else if let Some(dir) = &self.output_dir { PathBuf::from(dir).join(&self.catalog) } else { - unreachable!("catalog_path: output_dir must be set when not using s3") + unreachable!("catalog_path: output_dir must be set when not using remote storage") } } @@ -427,7 +479,7 @@ impl Export { .await?; // Create directory if needed for file system storage - if !export_self.s3 { + if !export_self.s3 && !export_self.oss { let db_dir = format!("{}/{}/", export_self.catalog, schema); operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; } @@ -473,6 +525,8 @@ impl Export { async fn build_operator(&self) -> Result { if self.s3 { self.build_s3_operator().await + } else if self.oss { + self.build_oss_operator().await } else { self.build_fs_operator().await } @@ -480,9 +534,8 @@ impl Export { /// build operator with preference for file system async fn build_prefer_fs_operator(&self) -> Result { - // is under s3 mode and s3_ddl_dir is set, use it as root - if self.s3 && self.s3_ddl_local_dir.is_some() { - let root = self.s3_ddl_local_dir.as_ref().unwrap().clone(); + if (self.s3 || self.oss) && self.ddl_local_dir.is_some() { + let root = self.ddl_local_dir.as_ref().unwrap().clone(); let op = ObjectStore::new(services::Fs::default().root(&root)) .context(OpenDalSnafu)? .layer(LoggingLayer::default()) @@ -490,6 +543,8 @@ impl Export { Ok(op) } else if self.s3 { self.build_s3_operator().await + } else if self.oss { + self.build_oss_operator().await } else { self.build_fs_operator().await } @@ -515,11 +570,35 @@ impl Export { } if let Some(key_id) = self.s3_access_key.as_ref() { - builder = builder.access_key_id(key_id); + builder = builder.access_key_id(key_id.expose_secret()); } if let Some(secret_key) = self.s3_secret_key.as_ref() { - builder = builder.secret_access_key(secret_key); + builder = builder.secret_access_key(secret_key.expose_secret()); + } + + let op = ObjectStore::new(builder) + .context(OpenDalSnafu)? + .layer(LoggingLayer::default()) + .finish(); + Ok(op) + } + + async fn build_oss_operator(&self) -> Result { + let mut builder = Oss::default() + .bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set")) + .endpoint( + self.oss_endpoint + .as_ref() + .expect("oss_endpoint must be set"), + ); + + // Use expose_secret() to access the actual secret value + if let Some(key_id) = self.oss_access_key_id.as_ref() { + builder = builder.access_key_id(key_id.expose_secret()); + } + if let Some(secret_key) = self.oss_access_key_secret.as_ref() { + builder = builder.access_key_secret(secret_key.expose_secret()); } let op = ObjectStore::new(builder) @@ -562,8 +641,8 @@ impl Export { tasks.push(async move { let _permit = semaphore_moved.acquire().await.unwrap(); - // Create directory if not using S3 - if !export_self.s3 { + // Create directory if not using remote storage + if !export_self.s3 && !export_self.oss { let db_dir = format!("{}/{}/", export_self.catalog, schema); operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; } @@ -575,7 +654,11 @@ impl Export { r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#, export_self.catalog, schema, path, with_options_clone, connection_part ); - info!("Executing sql: {sql}"); + + // Log SQL command but mask sensitive information + let safe_sql = export_self.mask_sensitive_sql(&sql); + info!("Executing sql: {}", safe_sql); + export_self.database_client.sql_in_public(&sql).await?; info!( "Finished exporting {}.{} data to {}", @@ -615,6 +698,29 @@ impl Export { Ok(()) } + /// Mask sensitive information in SQL commands for safe logging + fn mask_sensitive_sql(&self, sql: &str) -> String { + let mut masked_sql = sql.to_string(); + + // Mask S3 credentials + if let Some(access_key) = &self.s3_access_key { + masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]"); + } + if let Some(secret_key) = &self.s3_secret_key { + masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]"); + } + + // Mask OSS credentials + if let Some(access_key_id) = &self.oss_access_key_id { + masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]"); + } + if let Some(access_key_secret) = &self.oss_access_key_secret { + masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]"); + } + + masked_sql + } + fn get_file_path(&self, schema: &str, file_name: &str) -> String { format!("{}/{}/{}", self.catalog, schema, file_name) } @@ -631,6 +737,13 @@ impl Export { }, file_path ) + } else if self.oss { + format!( + "oss://{}/{}/{}", + self.oss_bucket.as_ref().unwrap_or(&String::new()), + self.catalog, + file_path + ) } else { format!( "{}/{}", @@ -675,15 +788,36 @@ impl Export { }; // Safety: All s3 options are required + // Use expose_secret() to access the actual secret values let connection_options = format!( "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}", - self.s3_access_key.as_ref().unwrap(), - self.s3_secret_key.as_ref().unwrap(), + self.s3_access_key.as_ref().unwrap().expose_secret(), + self.s3_secret_key.as_ref().unwrap().expose_secret(), self.s3_region.as_ref().unwrap(), endpoint_option ); (s3_path, format!(" CONNECTION ({})", connection_options)) + } else if self.oss { + let oss_path = format!( + "oss://{}/{}/{}/", + self.oss_bucket.as_ref().unwrap(), + self.catalog, + schema + ); + let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() { + format!(", ENDPOINT='{}'", endpoint) + } else { + String::new() + }; + + let connection_options = format!( + "ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}", + self.oss_access_key_id.as_ref().unwrap().expose_secret(), + self.oss_access_key_secret.as_ref().unwrap().expose_secret(), + endpoint_option + ); + (oss_path, format!(" CONNECTION ({})", connection_options)) } else { ( self.catalog_path() diff --git a/src/common/datasource/src/object_store.rs b/src/common/datasource/src/object_store.rs index d2ed0a4ad8..942df20336 100644 --- a/src/common/datasource/src/object_store.rs +++ b/src/common/datasource/src/object_store.rs @@ -13,7 +13,9 @@ // limitations under the License. pub mod fs; +pub mod oss; pub mod s3; + use std::collections::HashMap; use lazy_static::lazy_static; @@ -25,10 +27,12 @@ use url::{ParseError, Url}; use self::fs::build_fs_backend; use self::s3::build_s3_backend; use crate::error::{self, Result}; +use crate::object_store::oss::build_oss_backend; use crate::util::find_dir_and_filename; pub const FS_SCHEMA: &str = "FS"; pub const S3_SCHEMA: &str = "S3"; +pub const OSS_SCHEMA: &str = "OSS"; /// Returns `(schema, Option, path)` pub fn parse_url(url: &str) -> Result<(String, Option, String)> { @@ -64,6 +68,12 @@ pub fn build_backend(url: &str, connection: &HashMap) -> Result< })?; Ok(build_s3_backend(&host, &root, connection)?) } + OSS_SCHEMA => { + let host = host.context(error::EmptyHostPathSnafu { + url: url.to_string(), + })?; + Ok(build_oss_backend(&host, &root, connection)?) + } FS_SCHEMA => Ok(build_fs_backend(&root)?), _ => error::UnsupportedBackendProtocolSnafu { diff --git a/src/common/datasource/src/object_store/oss.rs b/src/common/datasource/src/object_store/oss.rs new file mode 100644 index 0000000000..0aa583550a --- /dev/null +++ b/src/common/datasource/src/object_store/oss.rs @@ -0,0 +1,118 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use object_store::services::Oss; +use object_store::ObjectStore; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +const BUCKET: &str = "bucket"; +const ENDPOINT: &str = "endpoint"; +const ACCESS_KEY_ID: &str = "access_key_id"; +const ACCESS_KEY_SECRET: &str = "access_key_secret"; +const ROOT: &str = "root"; +const ALLOW_ANONYMOUS: &str = "allow_anonymous"; + +/// Check if the key is supported in OSS configuration. +pub fn is_supported_in_oss(key: &str) -> bool { + [ + ROOT, + ALLOW_ANONYMOUS, + BUCKET, + ENDPOINT, + ACCESS_KEY_ID, + ACCESS_KEY_SECRET, + ] + .contains(&key) +} + +/// Build an OSS backend using the provided bucket, root, and connection parameters. +pub fn build_oss_backend( + bucket: &str, + root: &str, + connection: &HashMap, +) -> Result { + let mut builder = Oss::default().bucket(bucket).root(root); + + if let Some(endpoint) = connection.get(ENDPOINT) { + builder = builder.endpoint(endpoint); + } + + if let Some(access_key_id) = connection.get(ACCESS_KEY_ID) { + builder = builder.access_key_id(access_key_id); + } + + if let Some(access_key_secret) = connection.get(ACCESS_KEY_SECRET) { + builder = builder.access_key_secret(access_key_secret); + } + + if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) { + let allow = allow_anonymous.as_str().parse::().map_err(|e| { + error::InvalidConnectionSnafu { + msg: format!( + "failed to parse the option {}={}, {}", + ALLOW_ANONYMOUS, allow_anonymous, e + ), + } + .build() + })?; + if allow { + builder = builder.allow_anonymous(); + } + } + + let op = ObjectStore::new(builder) + .context(error::BuildBackendSnafu)? + .layer(object_store::layers::LoggingLayer::default()) + .layer(object_store::layers::TracingLayer) + .layer(object_store::layers::build_prometheus_metrics_layer(true)) + .finish(); + + Ok(op) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_supported_in_oss() { + assert!(is_supported_in_oss(ROOT)); + assert!(is_supported_in_oss(ALLOW_ANONYMOUS)); + assert!(is_supported_in_oss(BUCKET)); + assert!(is_supported_in_oss(ENDPOINT)); + assert!(is_supported_in_oss(ACCESS_KEY_ID)); + assert!(is_supported_in_oss(ACCESS_KEY_SECRET)); + assert!(!is_supported_in_oss("foo")); + assert!(!is_supported_in_oss("BAR")); + } + + #[test] + fn test_build_oss_backend_all_fields_valid() { + let mut connection = HashMap::new(); + connection.insert( + ENDPOINT.to_string(), + "http://oss-ap-southeast-1.aliyuncs.com".to_string(), + ); + connection.insert(ACCESS_KEY_ID.to_string(), "key_id".to_string()); + connection.insert(ACCESS_KEY_SECRET.to_string(), "key_secret".to_string()); + connection.insert(ALLOW_ANONYMOUS.to_string(), "true".to_string()); + + let result = build_oss_backend("my-bucket", "my-root", &connection); + assert!(result.is_ok()); + } +} diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 75a4ab64d6..206bafe29c 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -19,6 +19,7 @@ use std::fmt; use std::str::FromStr; use common_base::readable_size::ReadableSize; +use common_datasource::object_store::oss::is_supported_in_oss; use common_datasource::object_store::s3::is_supported_in_s3; use common_query::AddColumnLocation; use common_time::range::TimestampRange; @@ -70,6 +71,10 @@ pub fn validate_table_option(key: &str) -> bool { return true; } + if is_supported_in_oss(key) { + return true; + } + if is_mito_engine_option_key(key) { return true; }