From 66e5ed548389559bb1d598fd249d211272f1c442 Mon Sep 17 00:00:00 2001 From: Niwaka <61189782+NiwakaDev@users.noreply.github.com> Date: Thu, 6 Jul 2023 00:03:51 +0900 Subject: [PATCH] feat: support gcs storage (#1781) --- .env.example | 6 ++- src/cmd/src/datanode.rs | 1 + src/datanode/src/datanode.rs | 28 ++++++++++++++ src/datanode/src/store.rs | 9 +++++ src/datanode/src/store/gcs.rs | 42 ++++++++++++++++++++ src/object-store/tests/object_store_test.rs | 28 +++++++++++++- tests-integration/src/test_util.rs | 43 ++++++++++++++++++--- tests-integration/tests/main.rs | 4 +- 8 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 src/datanode/src/store/gcs.rs diff --git a/.env.example b/.env.example index 3bb3de91d4..4d45913df0 100644 --- a/.env.example +++ b/.env.example @@ -14,4 +14,8 @@ GT_AZBLOB_CONTAINER=AZBLOB container GT_AZBLOB_ACCOUNT_NAME=AZBLOB account name GT_AZBLOB_ACCOUNT_KEY=AZBLOB account key GT_AZBLOB_ENDPOINT=AZBLOB endpoint - +# Settings for gcs test +GT_GCS_BUCKET = GCS bucket +GT_GCS_SCOPE = GCS scope +GT_GCS_CREDENTIAL_PATH = GCS credential path +GT_GCS_ENDPOINT = GCS end point diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index bd04423578..3c58822996 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -273,6 +273,7 @@ mod tests { ObjectStoreConfig::S3 { .. } => unreachable!(), ObjectStoreConfig::Oss { .. } => unreachable!(), ObjectStoreConfig::Azblob { .. } => unreachable!(), + ObjectStoreConfig::Gcs { .. } => unreachable!(), }; assert_eq!( diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 6fa7eb4e7a..3378699e92 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -52,6 +52,7 @@ pub enum ObjectStoreConfig { S3(S3Config), Oss(OssConfig), Azblob(AzblobConfig), + Gcs(GcsConfig), } /// Storage engine config @@ -122,6 +123,19 @@ pub struct AzblobConfig { pub cache_capacity: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct GcsConfig { + pub root: String, + pub bucket: String, + pub scope: String, + #[serde(skip_serializing)] + pub credential_path: SecretString, + pub endpoint: String, + pub cache_path: Option, + pub cache_capacity: Option, +} + impl Default for S3Config { fn default() -> Self { Self { @@ -166,6 +180,20 @@ impl Default for AzblobConfig { } } +impl Default for GcsConfig { + fn default() -> Self { + Self { + root: String::default(), + bucket: String::default(), + scope: String::default(), + credential_path: SecretString::from(String::default()), + endpoint: String::default(), + cache_path: Option::default(), + cache_capacity: Option::default(), + } + } +} + impl Default for ObjectStoreConfig { fn default() -> Self { ObjectStoreConfig::File(FileConfig { diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index c3315ed6df..faaddf0460 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -16,6 +16,7 @@ mod azblob; mod fs; +mod gcs; mod oss; mod s3; @@ -40,6 +41,7 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result ObjectStoreConfig::Azblob(azblob_config) => { azblob::new_azblob_object_store(azblob_config).await } + ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await, }?; // Enable retry layer and cache layer for non-fs object storages @@ -88,6 +90,13 @@ async fn create_object_store_with_cache( .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); (path, capacity) } + ObjectStoreConfig::Gcs(gcs_config) => { + let path = gcs_config.cache_path.as_ref(); + let capacity = gcs_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } _ => (None, ReadableSize(0)), }; diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs new file mode 100644 index 0000000000..08ede76b39 --- /dev/null +++ b/src/datanode/src/store/gcs.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::logging::info; +use object_store::services::Gcs as GCSBuilder; +use object_store::{util, ObjectStore}; +use secrecy::ExposeSecret; +use snafu::prelude::*; + +use crate::datanode::GcsConfig; +use crate::error::{self, Result}; + +pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result { + let root = util::normalize_dir(&gcs_config.root); + info!( + "The gcs storage bucket is: {}, root is: {}", + gcs_config.bucket, &root + ); + + let mut builder = GCSBuilder::default(); + builder + .root(&root) + .bucket(&gcs_config.bucket) + .scope(&gcs_config.scope) + .credential_path(gcs_config.credential_path.expose_secret()) + .endpoint(&gcs_config.endpoint); + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index f96d39e00e..fd3c18b185 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -23,7 +23,7 @@ use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{util, ObjectStore, ObjectStoreBuilder}; use opendal::raw::Accessor; -use opendal::services::{Azblob, Oss}; +use opendal::services::{Azblob, Gcs, Oss}; use opendal::{EntryMode, Operator, OperatorBuilder}; async fn test_object_crud(store: &ObjectStore) -> Result<()> { @@ -185,6 +185,32 @@ async fn test_azblob_backend() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_gcs_backend() -> Result<()> { + logging::init_default_ut_logging(); + if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") { + if !container.is_empty() { + logging::info!("Running azblob test."); + + let mut builder = Gcs::default(); + builder + .root(&uuid::Uuid::new_v4().to_string()) + .bucket(&env::var("GT_GCS_BUCKET").unwrap()) + .scope(&env::var("GT_GCS_SCOPE").unwrap()) + .credential_path(&env::var("GT_GCS_CREDENTIAL_PATH").unwrap()) + .endpoint(&env::var("GT_GCS_ENDPOINT").unwrap()); + + let store = ObjectStore::new(builder).unwrap().finish(); + + let guard = TempFolder::new(&store, "/"); + test_object_crud(&store).await?; + test_object_list(&store).await?; + guard.remove_all().await?; + } + } + Ok(()) +} + async fn assert_lru_cache( cache_layer: &LruCacheLayer, file_names: &[&str], diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index fa5675c3d3..611b3354c1 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -29,8 +29,8 @@ use common_runtime::Builder as RuntimeBuilder; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ - AzblobConfig, DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, - S3Config, StorageConfig, WalConfig, + AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, + ProcedureConfig, S3Config, StorageConfig, WalConfig, }; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::Instance; @@ -43,7 +43,7 @@ use datatypes::vectors::{ }; use frontend::instance::Instance as FeInstance; use frontend::service_config::{MysqlOptions, PostgresOptions}; -use object_store::services::{Azblob, Oss, S3}; +use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; use secrecy::ExposeSecret; @@ -68,6 +68,7 @@ pub enum StorageType { File, Oss, Azblob, + Gcs, } impl StorageType { @@ -97,6 +98,13 @@ impl StorageType { false } } + StorageType::Gcs => { + if let Ok(b) = env::var("GT_GCS_BUCKET") { + !b.is_empty() + } else { + false + } + } } } } @@ -119,6 +127,28 @@ pub fn get_test_store_config( let _ = dotenv::dotenv(); match store_type { + StorageType::Gcs => { + let gcs_config = GcsConfig { + root: uuid::Uuid::new_v4().to_string(), + bucket: env::var("GT_GCS_BUCKET").unwrap(), + scope: env::var("GT_GCS_SCOPE").unwrap(), + credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(), + endpoint: env::var("GT_GCS_ENDPOINT").unwrap(), + ..Default::default() + }; + + let mut builder = Gcs::default(); + builder + .root(&gcs_config.root) + .bucket(&gcs_config.bucket) + .scope(&gcs_config.scope) + .credential_path(gcs_config.credential_path.expose_secret()) + .endpoint(&gcs_config.endpoint); + + let config = ObjectStoreConfig::Gcs(gcs_config); + let store = ObjectStore::new(builder).unwrap().finish(); + (config, TempDirGuard::Gcs(TempFolder::new(&store, "/"))) + } StorageType::Azblob => { let azblob_config = AzblobConfig { root: uuid::Uuid::new_v4().to_string(), @@ -216,6 +246,7 @@ pub enum TempDirGuard { S3(TempFolder), Oss(TempFolder), Azblob(TempFolder), + Gcs(TempFolder), } pub struct TestGuard { @@ -229,8 +260,10 @@ pub struct StorageGuard(pub TempDirGuard); impl TestGuard { pub async fn remove_all(&mut self) { - if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) | TempDirGuard::Azblob(guard) = - &mut self.storage_guard.0 + if let TempDirGuard::S3(guard) + | TempDirGuard::Oss(guard) + | TempDirGuard::Azblob(guard) + | TempDirGuard::Gcs(guard) = &mut self.storage_guard.0 { guard.remove_all().await.unwrap() } diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index d06ea4d05e..eec30447a3 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -21,7 +21,7 @@ mod sql; #[macro_use] mod region_failover; -grpc_tests!(File, S3, S3WithCache, Oss, Azblob); -http_tests!(File, S3, S3WithCache, Oss, Azblob); +grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); +http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); region_failover_tests!(File, S3, S3WithCache, Oss, Azblob); sql_tests!(File);