From ae81c7329d0ae217ca63bc5b447ffe2b723870ee Mon Sep 17 00:00:00 2001 From: Zou Wei <47681251+QuenKar@users.noreply.github.com> Date: Tue, 30 May 2023 19:59:38 +0800 Subject: [PATCH] feat: support azblob storage. (#1659) * feat:support azblob storage. * test:add some tests. * refactor:use if-let. --- .env.example | 6 +++ src/cmd/src/datanode.rs | 1 + src/datanode/src/datanode.rs | 31 ++++++++++++++ src/datanode/src/store.rs | 11 +++++ src/datanode/src/store/azblob.rs | 47 +++++++++++++++++++++ src/object-store/tests/object_store_test.rs | 29 ++++++++++++- tests-integration/README.md | 6 +++ tests-integration/src/test_util.rs | 47 +++++++++++++++++++-- tests-integration/tests/main.rs | 4 +- 9 files changed, 175 insertions(+), 7 deletions(-) create mode 100644 src/datanode/src/store/azblob.rs diff --git a/.env.example b/.env.example index 4abec140f6..3bb3de91d4 100644 --- a/.env.example +++ b/.env.example @@ -9,3 +9,9 @@ GT_OSS_BUCKET=OSS bucket GT_OSS_ACCESS_KEY_ID=OSS access key id GT_OSS_ACCESS_KEY=OSS access key GT_OSS_ENDPOINT=OSS endpoint +# Settings for azblob test +GT_AZBLOB_CONTAINER=AZBLOB container +GT_AZBLOB_ACCOUNT_NAME=AZBLOB account name +GT_AZBLOB_ACCOUNT_KEY=AZBLOB account key +GT_AZBLOB_ENDPOINT=AZBLOB endpoint + diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 70a4f68eb0..448f11ffe5 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -280,6 +280,7 @@ mod tests { } ObjectStoreConfig::S3 { .. } => unreachable!(), ObjectStoreConfig::Oss { .. } => unreachable!(), + ObjectStoreConfig::Azblob { .. } => unreachable!(), }; assert_eq!( diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 7ffa68ba69..664f13aead 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -47,6 +47,7 @@ pub enum ObjectStoreConfig { File(FileConfig), S3(S3Config), Oss(OssConfig), + Azblob(AzblobConfig), } /// Storage engine config @@ -95,6 +96,21 @@ pub struct OssConfig { pub cache_capacity: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AzblobConfig { + pub container: String, + pub root: String, + #[serde(skip_serializing)] + pub account_name: SecretString, + #[serde(skip_serializing)] + pub account_key: SecretString, + pub endpoint: String, + pub sas_token: Option, + pub cache_path: Option, + pub cache_capacity: Option, +} + impl Default for S3Config { fn default() -> Self { Self { @@ -124,6 +140,21 @@ impl Default for OssConfig { } } +impl Default for AzblobConfig { + fn default() -> Self { + Self { + container: String::default(), + root: String::default(), + account_name: SecretString::from(String::default()), + account_key: SecretString::from(String::default()), + endpoint: String::default(), + cache_path: Option::default(), + cache_capacity: Option::default(), + sas_token: Option::default(), + } + } +} + impl Default for ObjectStoreConfig { fn default() -> Self { ObjectStoreConfig::File(FileConfig { diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 31cd141c26..99d3faa8fd 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -14,6 +14,7 @@ //! object storage utilities +mod azblob; mod fs; mod oss; mod s3; @@ -36,6 +37,9 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result ObjectStoreConfig::File(file_config) => fs::new_fs_object_store(file_config).await, ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await, ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await, + ObjectStoreConfig::Azblob(azblob_config) => { + azblob::new_azblob_object_store(azblob_config).await + } }?; // Enable retry layer and cache layer for non-fs object storages @@ -76,6 +80,13 @@ async fn create_object_store_with_cache( .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); (path, capacity) } + ObjectStoreConfig::Azblob(azblob_config) => { + let path = azblob_config.cache_path.as_ref(); + let capacity = azblob_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } _ => (None, ReadableSize(0)), }; diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs new file mode 100644 index 0000000000..40497fd38c --- /dev/null +++ b/src/datanode/src/store/azblob.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::logging::info; +use object_store::services::Azblob as AzureBuilder; +use object_store::{util, ObjectStore}; +use secrecy::ExposeSecret; +use snafu::prelude::*; + +use crate::datanode::AzblobConfig; +use crate::error::{self, Result}; + +pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result { + let root = util::normalize_dir(&azblob_config.root); + + info!( + "The azure storage container is: {}, root is: {}", + azblob_config.container, &root + ); + + let mut builder = AzureBuilder::default(); + builder + .root(&root) + .container(&azblob_config.container) + .endpoint(&azblob_config.endpoint) + .account_name(azblob_config.account_name.expose_secret()) + .account_key(azblob_config.account_key.expose_secret()); + + if let Some(token) = &azblob_config.sas_token { + builder.sas_token(token); + } + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index bc3f29b515..c70587bed7 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -23,7 +23,7 @@ use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{util, ObjectStore, ObjectStoreBuilder}; use opendal::raw::Accessor; -use opendal::services::Oss; +use opendal::services::{Azblob, Oss}; use opendal::{EntryMode, Operator, OperatorBuilder}; async fn test_object_crud(store: &ObjectStore) -> Result<()> { @@ -158,6 +158,33 @@ async fn test_oss_backend() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_azblob_backend() -> Result<()> { + logging::init_default_ut_logging(); + if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") { + if !container.is_empty() { + logging::info!("Running azblob test."); + + let root = uuid::Uuid::new_v4().to_string(); + + let mut builder = Azblob::default(); + builder + .root(&root) + .account_name(&env::var("GT_AZBLOB_ACCOUNT_NAME")?) + .account_key(&env::var("GT_AZBLOB_ACCOUNT_KEY")?) + .container(&container); + + let store = ObjectStore::new(builder).unwrap().finish(); + + let mut guard = TempFolder::new(&store, "/"); + test_object_crud(&store).await?; + test_object_list(&store).await?; + guard.remove_all().await?; + } + } + Ok(()) +} + async fn assert_lru_cache( cache_layer: &LruCacheLayer, file_names: &[&str], diff --git a/tests-integration/README.md b/tests-integration/README.md index 6e8a637ad6..27f66da867 100644 --- a/tests-integration/README.md +++ b/tests-integration/README.md @@ -32,3 +32,9 @@ Test oss storage: ``` cargo test oss ``` + +Test azblob storage: + +``` +cargo test azblob +``` \ No newline at end of file diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 043a8214ba..2f7eb74c66 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -26,8 +26,8 @@ use common_runtime::Builder as RuntimeBuilder; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ - DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, S3Config, - StorageConfig, WalConfig, + AzblobConfig, DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, + S3Config, StorageConfig, WalConfig, }; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::Instance; @@ -35,7 +35,7 @@ use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use frontend::instance::Instance as FeInstance; -use object_store::services::{Oss, S3}; +use object_store::services::{Azblob, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; use secrecy::ExposeSecret; @@ -57,6 +57,7 @@ pub enum StorageType { S3WithCache, File, Oss, + Azblob, } impl StorageType { @@ -79,6 +80,13 @@ impl StorageType { false } } + StorageType::Azblob => { + if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") { + !b.is_empty() + } else { + false + } + } } } } @@ -101,6 +109,34 @@ pub fn get_test_store_config( let _ = dotenv::dotenv(); match store_type { + StorageType::Azblob => { + let azblob_config = AzblobConfig { + root: uuid::Uuid::new_v4().to_string(), + container: env::var("GT_AZBLOB_CONTAINER").unwrap(), + account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(), + account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(), + endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(), + ..Default::default() + }; + + let mut builder = Azblob::default(); + builder + .root(&azblob_config.root) + .endpoint(&azblob_config.endpoint) + .account_name(azblob_config.account_name.expose_secret()) + .account_key(azblob_config.account_key.expose_secret()) + .container(&azblob_config.container); + + if let Ok(sas_token) = env::var("GT_AZBLOB_SAS_TOKEN") { + builder.sas_token(&sas_token); + } + + let config = ObjectStoreConfig::Azblob(azblob_config); + + let store = ObjectStore::new(builder).unwrap().finish(); + + (config, TempDirGuard::Azblob(TempFolder::new(&store, "/"))) + } StorageType::Oss => { let oss_config = OssConfig { root: uuid::Uuid::new_v4().to_string(), @@ -169,6 +205,7 @@ pub enum TempDirGuard { File(TempDir), S3(TempFolder), Oss(TempFolder), + Azblob(TempFolder), } pub struct TestGuard { @@ -182,7 +219,9 @@ pub struct StorageGuard(pub TempDirGuard); impl TestGuard { pub async fn remove_all(&mut self) { - if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) = &mut self.storage_guard.0 { + if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) | TempDirGuard::Azblob(guard) = + &mut self.storage_guard.0 + { guard.remove_all().await.unwrap() } } diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 2b4e17bbf3..e3b09a30a7 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -17,5 +17,5 @@ mod grpc; #[macro_use] mod http; -grpc_tests!(File, S3, S3WithCache, Oss); -http_tests!(File, S3, S3WithCache, Oss); +grpc_tests!(File, S3, S3WithCache, Oss, Azblob); +http_tests!(File, S3, S3WithCache, Oss, Azblob);