From a7dc86ffe5c7be47e65c399b124c33222431aade Mon Sep 17 00:00:00 2001 From: Yun Chen Date: Mon, 30 Jan 2023 01:09:38 +1300 Subject: [PATCH] feat: oss storage support (#911) * feat: add oss storage support * fix: ci build format check * fix: align OSS to Oss * fix: cr comments * fix: rename OSS to Oss in integration tests * fix: clippy fix --- .env.example | 6 ++ Cargo.lock | 17 +++-- src/cmd/src/datanode.rs | 7 +- src/datanode/src/datanode.rs | 45 +++++++---- src/datanode/src/instance.rs | 82 +++++++++++++-------- src/datanode/src/tests/test_util.rs | 6 +- src/frontend/src/tests.rs | 10 +-- src/object-store/Cargo.toml | 5 +- src/object-store/tests/object_store_test.rs | 29 ++++++++ tests-integration/README.md | 7 ++ tests-integration/src/test_util.rs | 75 ++++++++++++++----- tests-integration/tests/main.rs | 4 +- 12 files changed, 210 insertions(+), 83 deletions(-) diff --git a/.env.example b/.env.example index 9117d0d4c6..da1bbcc213 100644 --- a/.env.example +++ b/.env.example @@ -2,3 +2,9 @@ GT_S3_BUCKET=S3 bucket GT_S3_ACCESS_KEY_ID=S3 access key id GT_S3_ACCESS_KEY=S3 secret access key + +# Settings for oss test +GT_OSS_BUCKET=OSS bucket +GT_OSS_ACCESS_KEY_ID=OSS access key id +GT_OSS_ACCESS_KEY=OSS access key +GT_OSS_ENDPOINT=OSS endpoint diff --git a/Cargo.lock b/Cargo.lock index 18a8164c01..f2fcbd6f9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,9 +681,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" +checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" [[package]] name = "benchmarks" @@ -4401,20 +4401,21 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.24.2" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97541724cf371973b28f5a873404f2a2a4f7bb1efe7ca36a27836c13958781c2" +checksum = "73829d3a057542556dc2c2d2b70700a44dda913cdb5483094c20ef9673ca283c" dependencies = [ "anyhow", "async-compat", "async-trait", "backon", - "base64 0.20.0", + "base64 0.21.0", "bincode 2.0.0-rc.2", "bytes", "flagset", "futures", "http", + "hyper", "log", "md-5", "metrics", @@ -5674,13 +5675,13 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.7.4" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c97ac0f771c78ddf4bcb73c8454c76565a7249780e7296767f7e89661b0e045" +checksum = "3f446438814fde3785305a59a85a6d1b361ce2c9d29e58dd87c9103a242c40b6" dependencies = [ "anyhow", "backon", - "base64 0.20.0", + "base64 0.21.0", "bytes", "dirs 4.0.0", "form_urlencoded", diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index b431d30913..4c1f066f1d 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -14,7 +14,7 @@ use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig}; use meta_client::MetaClientOpts; use servers::Mode; use snafu::ResultExt; @@ -128,7 +128,7 @@ impl TryFrom for DatanodeOptions { } if let Some(data_dir) = cmd.data_dir { - opts.storage = ObjectStoreConfig::File { data_dir }; + opts.storage = ObjectStoreConfig::File(FileConfig { data_dir }); } if let Some(wal_dir) = cmd.wal_dir { @@ -175,10 +175,11 @@ mod tests { assert!(!tcp_nodelay); match options.storage { - ObjectStoreConfig::File { data_dir } => { + ObjectStoreConfig::File(FileConfig { data_dir }) => { assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir) } ObjectStoreConfig::S3 { .. } => unreachable!(), + ObjectStoreConfig::Oss { .. } => unreachable!(), }; } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index e2d385fde8..22e2231fd4 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -28,24 +28,43 @@ use crate::server::Services; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum ObjectStoreConfig { - File { - data_dir: String, - }, - S3 { - bucket: String, - root: String, - access_key_id: String, - secret_access_key: String, - endpoint: Option, - region: Option, - }, + File(FileConfig), + S3(S3Config), + Oss(OssConfig), +} + +#[derive(Debug, Clone, Serialize, Default, Deserialize)] +#[serde(default)] +pub struct FileConfig { + pub data_dir: String, +} + +#[derive(Debug, Clone, Serialize, Default, Deserialize)] +#[serde(default)] +pub struct S3Config { + pub bucket: String, + pub root: String, + pub access_key_id: String, + pub secret_access_key: String, + pub endpoint: Option, + pub region: Option, +} + +#[derive(Debug, Clone, Serialize, Default, Deserialize)] +#[serde(default)] +pub struct OssConfig { + pub bucket: String, + pub root: String, + pub access_key_id: String, + pub access_key_secret: String, + pub endpoint: String, } impl Default for ObjectStoreConfig { fn default() -> Self { - ObjectStoreConfig::File { + ObjectStoreConfig::File(FileConfig { data_dir: "/tmp/greptimedb/data/".to_string(), - } + }) } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 2c89d74024..a75ceab75d 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -30,6 +30,7 @@ use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::fs::Builder as FsBuilder; +use object_store::services::oss::Builder as OSSBuilder; use object_store::services::s3::Builder as S3Builder; use object_store::{util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; @@ -201,8 +202,9 @@ impl Instance { pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { let object_store = match store_config { - ObjectStoreConfig::File { data_dir } => new_fs_object_store(data_dir).await, + ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await, ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await, + ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await, }; object_store.map(|object_store| { @@ -214,41 +216,57 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result }) } -pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { - let (root, secret_key, key_id, bucket, endpoint, region) = match store_config { - ObjectStoreConfig::S3 { - bucket, - root, - access_key_id, - secret_access_key, - endpoint, - region, - } => ( - root, - secret_access_key, - access_key_id, - bucket, - endpoint, - region, - ), +pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Result { + let oss_config = match store_config { + ObjectStoreConfig::Oss(config) => config, _ => unreachable!(), }; - let root = util::normalize_dir(root); - info!("The s3 storage bucket is: {}, root is: {}", bucket, &root); + let root = util::normalize_dir(&oss_config.root); + info!( + "The oss storage bucket is: {}, root is: {}", + oss_config.bucket, &root + ); + + let mut builder = OSSBuilder::default(); + let builder = builder + .root(&root) + .bucket(&oss_config.bucket) + .endpoint(&oss_config.endpoint) + .access_key_id(&oss_config.access_key_id) + .access_key_secret(&oss_config.access_key_secret); + + let accessor = builder.build().with_context(|_| error::InitBackendSnafu { + config: store_config.clone(), + })?; + + Ok(ObjectStore::new(accessor)) +} + +pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { + let s3_config = match store_config { + ObjectStoreConfig::S3(config) => config, + _ => unreachable!(), + }; + + let root = util::normalize_dir(&s3_config.root); + info!( + "The s3 storage bucket is: {}, root is: {}", + s3_config.bucket, &root + ); let mut builder = S3Builder::default(); let mut builder = builder .root(&root) - .bucket(bucket) - .access_key_id(key_id) - .secret_access_key(secret_key); + .bucket(&s3_config.bucket) + .access_key_id(&s3_config.access_key_id) + .secret_access_key(&s3_config.secret_access_key); - if let Some(endpoint) = endpoint { - builder = builder.endpoint(endpoint); + if s3_config.endpoint.is_some() { + builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap()); } - if let Some(region) = region { - builder = builder.region(region); + if s3_config.region.is_some() { + builder = builder.region(s3_config.region.as_ref().unwrap()); } let accessor = builder.build().with_context(|_| error::InitBackendSnafu { @@ -258,8 +276,12 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res Ok(ObjectStore::new(accessor)) } -pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result { - let data_dir = util::normalize_dir(data_dir); +pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { + let file_config = match store_config { + ObjectStoreConfig::File(config) => config, + _ => unreachable!(), + }; + let data_dir = util::normalize_dir(&file_config.data_dir); fs::create_dir_all(path::Path::new(&data_dir)) .context(error::CreateDirSnafu { dir: &data_dir })?; info!("The file storage directory is: {}", &data_dir); @@ -271,7 +293,7 @@ pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result { .atomic_write_dir(&atomic_write_dir) .build() .context(error::InitBackendSnafu { - config: ObjectStoreConfig::File { data_dir }, + config: store_config.clone(), })?; Ok(ObjectStore::new(accessor)) diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index d21603d0e0..5163fd59bc 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -29,7 +29,7 @@ use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; use tempdir::TempDir; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; +use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig}; use crate::error::{CreateTableSnafu, Result}; use crate::instance::Instance; use crate::sql::SqlHandler; @@ -67,9 +67,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage: ObjectStoreConfig::File { + storage: ObjectStoreConfig::File(FileConfig { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, + }), mode: Mode::Standalone, ..Default::default() }; diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 9fd55e24fb..3249000605 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -20,7 +20,7 @@ use catalog::remote::MetaKvBackend; use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; +use datanode::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig}; use datanode::instance::Instance as DatanodeInstance; use meta_client::client::MetaClientBuilder; use meta_client::rpc::Peer; @@ -81,9 +81,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage: ObjectStoreConfig::File { + storage: ObjectStoreConfig::File(FileConfig { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, + }), mode: Mode::Standalone, ..Default::default() }; @@ -167,9 +167,9 @@ async fn create_distributed_datanode( dir: wal_tmp_dir.path().to_str().unwrap().to_string(), ..Default::default() }, - storage: ObjectStoreConfig::File { + storage: ObjectStoreConfig::File(FileConfig { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, + }), mode: Mode::Distributed, ..Default::default() }; diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 0c7ce00263..bf03f7e501 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -6,7 +6,10 @@ license.workspace = true [dependencies] futures = { version = "0.3" } -opendal = { version = "0.24", features = ["layers-tracing", "layers-metrics"] } +opendal = { version = "0.25.1", features = [ + "layers-tracing", + "layers-metrics", +] } tokio.workspace = true [dev-dependencies] diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index c3173fbf99..c52db9e107 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -19,6 +19,7 @@ use common_telemetry::logging; use object_store::backend::{fs, s3}; use object_store::test_util::TempFolder; use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore}; +use opendal::services::oss; use tempdir::TempDir; async fn test_object_crud(store: &ObjectStore) -> Result<()> { @@ -131,3 +132,31 @@ async fn test_s3_backend() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_oss_backend() -> Result<()> { + logging::init_default_ut_logging(); + if let Ok(bucket) = env::var("GT_OSS_BUCKET") { + if !bucket.is_empty() { + logging::info!("Running oss test."); + + let root = uuid::Uuid::new_v4().to_string(); + + let accessor = oss::Builder::default() + .root(&root) + .access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?) + .access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?) + .bucket(&bucket) + .build()?; + + let store = ObjectStore::new(accessor); + + let mut guard = TempFolder::new(&store, "/"); + test_object_crud(&store).await?; + test_object_list(&store).await?; + guard.remove_all().await?; + } + } + + Ok(()) +} diff --git a/tests-integration/README.md b/tests-integration/README.md index ec0905504a..8db35ae7b4 100644 --- a/tests-integration/README.md +++ b/tests-integration/README.md @@ -11,6 +11,7 @@ GT_S3_ACCESS_KEY_ID=S3 access key id GT_S3_ACCESS_KEY=S3 secret access key ``` + ## Run Execute the following command in the project root folder: @@ -24,3 +25,9 @@ Test s3 storage: ``` cargo test s3 ``` + +Test oss storage: + +``` +cargo test oss +``` diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index a6353628a7..978495cec7 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -23,7 +23,9 @@ use axum::Router; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_runtime::Builder as RuntimeBuilder; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; +use datanode::datanode::{ + DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, WalConfig, +}; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; @@ -31,6 +33,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use frontend::instance::Instance as FeInstance; use object_store::backend::s3; +use object_store::services::oss; use object_store::test_util::TempFolder; use object_store::ObjectStore; use once_cell::sync::OnceCell; @@ -57,6 +60,7 @@ fn get_port() -> usize { pub enum StorageType { S3, File, + Oss, } impl StorageType { @@ -72,6 +76,13 @@ impl StorageType { false } } + StorageType::Oss => { + if let Ok(b) = env::var("GT_OSS_BUCKET") { + !b.is_empty() + } else { + false + } + } } } } @@ -83,29 +94,53 @@ fn get_test_store_config( let _ = dotenv::dotenv(); match store_type { - StorageType::S3 => { - let root = uuid::Uuid::new_v4().to_string(); - let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap(); - let secret_key = env::var("GT_S3_ACCESS_KEY").unwrap(); - let bucket = env::var("GT_S3_BUCKET").unwrap(); + StorageType::Oss => { + let oss_config = OssConfig { + root: uuid::Uuid::new_v4().to_string(), + access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap(), + access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(), + bucket: env::var("GT_OSS_BUCKET").unwrap(), + endpoint: env::var("GT_OSS_ENDPOINT").unwrap(), + }; - let accessor = s3::Builder::default() - .root(&root) - .access_key_id(&key_id) - .secret_access_key(&secret_key) - .bucket(&bucket) + let accessor = oss::Builder::default() + .root(&oss_config.root) + .endpoint(&oss_config.endpoint) + .access_key_id(&oss_config.access_key_id) + .access_key_secret(&oss_config.access_key_secret) + .bucket(&oss_config.bucket) .build() .unwrap(); - let config = ObjectStoreConfig::S3 { - root, - bucket, - access_key_id: key_id, - secret_access_key: secret_key, + let config = ObjectStoreConfig::Oss(oss_config); + + let store = ObjectStore::new(accessor); + + ( + config, + Some(TempDirGuard::Oss(TempFolder::new(&store, "/"))), + ) + } + StorageType::S3 => { + let s3_config = S3Config { + root: uuid::Uuid::new_v4().to_string(), + access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap(), + secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap(), + bucket: env::var("GT_S3_BUCKET").unwrap(), endpoint: None, region: None, }; + let accessor = s3::Builder::default() + .root(&s3_config.root) + .access_key_id(&s3_config.access_key_id) + .secret_access_key(&s3_config.secret_access_key) + .bucket(&s3_config.bucket) + .build() + .unwrap(); + + let config = ObjectStoreConfig::S3(s3_config); + let store = ObjectStore::new(accessor); (config, Some(TempDirGuard::S3(TempFolder::new(&store, "/")))) @@ -114,9 +149,9 @@ fn get_test_store_config( let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap(); ( - ObjectStoreConfig::File { + ObjectStoreConfig::File(FileConfig { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, + }), Some(TempDirGuard::File(data_tmp_dir)), ) } @@ -126,6 +161,7 @@ fn get_test_store_config( enum TempDirGuard { File(TempDir), S3(TempFolder), + Oss(TempFolder), } /// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`, @@ -140,6 +176,9 @@ impl TestGuard { if let Some(TempDirGuard::S3(mut guard)) = self.data_tmp_dir.take() { guard.remove_all().await.unwrap(); } + if let Some(TempDirGuard::Oss(mut guard)) = self.data_tmp_dir.take() { + guard.remove_all().await.unwrap(); + } } } diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 4f76d51aab..94d9efbc83 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -17,5 +17,5 @@ mod grpc; #[macro_use] mod http; -grpc_tests!(File, S3); -http_tests!(File, S3); +grpc_tests!(File, S3, Oss); +http_tests!(File, S3, Oss);