feat: oss storage support (#911)

* feat: add oss storage support

* fix: ci build format check

* fix: align OSS to Oss

* fix: cr comments

* fix: rename OSS to Oss in integration tests

* fix: clippy fix
This commit is contained in:
Yun Chen
2023-01-30 01:09:38 +13:00
committed by GitHub
parent 71482b38d7
commit a7dc86ffe5
12 changed files with 210 additions and 83 deletions

View File

@@ -2,3 +2,9 @@
GT_S3_BUCKET=S3 bucket
GT_S3_ACCESS_KEY_ID=S3 access key id
GT_S3_ACCESS_KEY=S3 secret access key
# Settings for oss test
GT_OSS_BUCKET=OSS bucket
GT_OSS_ACCESS_KEY_ID=OSS access key id
GT_OSS_ACCESS_KEY=OSS access key
GT_OSS_ENDPOINT=OSS endpoint

17
Cargo.lock generated
View File

@@ -681,9 +681,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "benchmarks"
@@ -4401,20 +4401,21 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.24.2"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97541724cf371973b28f5a873404f2a2a4f7bb1efe7ca36a27836c13958781c2"
checksum = "73829d3a057542556dc2c2d2b70700a44dda913cdb5483094c20ef9673ca283c"
dependencies = [
"anyhow",
"async-compat",
"async-trait",
"backon",
"base64 0.20.0",
"base64 0.21.0",
"bincode 2.0.0-rc.2",
"bytes",
"flagset",
"futures",
"http",
"hyper",
"log",
"md-5",
"metrics",
@@ -5674,13 +5675,13 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.7.4"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c97ac0f771c78ddf4bcb73c8454c76565a7249780e7296767f7e89661b0e045"
checksum = "3f446438814fde3785305a59a85a6d1b361ce2c9d29e58dd87c9103a242c40b6"
dependencies = [
"anyhow",
"backon",
"base64 0.20.0",
"base64 0.21.0",
"bytes",
"dirs 4.0.0",
"form_urlencoded",

View File

@@ -14,7 +14,7 @@
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig};
use meta_client::MetaClientOpts;
use servers::Mode;
use snafu::ResultExt;
@@ -128,7 +128,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}
if let Some(data_dir) = cmd.data_dir {
opts.storage = ObjectStoreConfig::File { data_dir };
opts.storage = ObjectStoreConfig::File(FileConfig { data_dir });
}
if let Some(wal_dir) = cmd.wal_dir {
@@ -175,10 +175,11 @@ mod tests {
assert!(!tcp_nodelay);
match options.storage {
ObjectStoreConfig::File { data_dir } => {
ObjectStoreConfig::File(FileConfig { data_dir }) => {
assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
};
}

View File

@@ -28,24 +28,43 @@ use crate::server::Services;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File {
data_dir: String,
},
S3 {
bucket: String,
root: String,
access_key_id: String,
secret_access_key: String,
endpoint: Option<String>,
region: Option<String>,
},
File(FileConfig),
S3(S3Config),
Oss(OssConfig),
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct FileConfig {
pub data_dir: String,
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub bucket: String,
pub root: String,
pub access_key_id: String,
pub secret_access_key: String,
pub endpoint: Option<String>,
pub region: Option<String>,
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub bucket: String,
pub root: String,
pub access_key_id: String,
pub access_key_secret: String,
pub endpoint: String,
}
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File {
ObjectStoreConfig::File(FileConfig {
data_dir: "/tmp/greptimedb/data/".to_string(),
}
})
}
}

View File

@@ -30,6 +30,7 @@ use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::oss::Builder as OSSBuilder;
use object_store::services::s3::Builder as S3Builder;
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
@@ -201,8 +202,9 @@ impl Instance {
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let object_store = match store_config {
ObjectStoreConfig::File { data_dir } => new_fs_object_store(data_dir).await,
ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await,
ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await,
ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await,
};
object_store.map(|object_store| {
@@ -214,41 +216,57 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result
})
}
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let (root, secret_key, key_id, bucket, endpoint, region) = match store_config {
ObjectStoreConfig::S3 {
bucket,
root,
access_key_id,
secret_access_key,
endpoint,
region,
} => (
root,
secret_access_key,
access_key_id,
bucket,
endpoint,
region,
),
pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let oss_config = match store_config {
ObjectStoreConfig::Oss(config) => config,
_ => unreachable!(),
};
let root = util::normalize_dir(root);
info!("The s3 storage bucket is: {}, root is: {}", bucket, &root);
let root = util::normalize_dir(&oss_config.root);
info!(
"The oss storage bucket is: {}, root is: {}",
oss_config.bucket, &root
);
let mut builder = OSSBuilder::default();
let builder = builder
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(&oss_config.access_key_id)
.access_key_secret(&oss_config.access_key_secret);
let accessor = builder.build().with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))
}
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let s3_config = match store_config {
ObjectStoreConfig::S3(config) => config,
_ => unreachable!(),
};
let root = util::normalize_dir(&s3_config.root);
info!(
"The s3 storage bucket is: {}, root is: {}",
s3_config.bucket, &root
);
let mut builder = S3Builder::default();
let mut builder = builder
.root(&root)
.bucket(bucket)
.access_key_id(key_id)
.secret_access_key(secret_key);
.bucket(&s3_config.bucket)
.access_key_id(&s3_config.access_key_id)
.secret_access_key(&s3_config.secret_access_key);
if let Some(endpoint) = endpoint {
builder = builder.endpoint(endpoint);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if let Some(region) = region {
builder = builder.region(region);
if s3_config.region.is_some() {
builder = builder.region(s3_config.region.as_ref().unwrap());
}
let accessor = builder.build().with_context(|_| error::InitBackendSnafu {
@@ -258,8 +276,12 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
Ok(ObjectStore::new(accessor))
}
pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result<ObjectStore> {
let data_dir = util::normalize_dir(data_dir);
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let file_config = match store_config {
ObjectStoreConfig::File(config) => config,
_ => unreachable!(),
};
let data_dir = util::normalize_dir(&file_config.data_dir);
fs::create_dir_all(path::Path::new(&data_dir))
.context(error::CreateDirSnafu { dir: &data_dir })?;
info!("The file storage directory is: {}", &data_dir);
@@ -271,7 +293,7 @@ pub(crate) async fn new_fs_object_store(data_dir: &str) -> Result<ObjectStore> {
.atomic_write_dir(&atomic_write_dir)
.build()
.context(error::InitBackendSnafu {
config: ObjectStoreConfig::File { data_dir },
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))

View File

@@ -29,7 +29,7 @@ use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use tempdir::TempDir;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig};
use crate::error::{CreateTableSnafu, Result};
use crate::instance::Instance;
use crate::sql::SqlHandler;
@@ -67,9 +67,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File {
storage: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
}),
mode: Mode::Standalone,
..Default::default()
};

View File

@@ -20,7 +20,7 @@ use catalog::remote::MetaKvBackend;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use datanode::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, WalConfig};
use datanode::instance::Instance as DatanodeInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::Peer;
@@ -81,9 +81,9 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File {
storage: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
}),
mode: Mode::Standalone,
..Default::default()
};
@@ -167,9 +167,9 @@ async fn create_distributed_datanode(
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File {
storage: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
}),
mode: Mode::Distributed,
..Default::default()
};

View File

@@ -6,7 +6,10 @@ license.workspace = true
[dependencies]
futures = { version = "0.3" }
opendal = { version = "0.24", features = ["layers-tracing", "layers-metrics"] }
opendal = { version = "0.25.1", features = [
"layers-tracing",
"layers-metrics",
] }
tokio.workspace = true
[dev-dependencies]

View File

@@ -19,6 +19,7 @@ use common_telemetry::logging;
use object_store::backend::{fs, s3};
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
use opendal::services::oss;
use tempdir::TempDir;
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
@@ -131,3 +132,31 @@ async fn test_s3_backend() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn test_oss_backend() -> Result<()> {
logging::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_OSS_BUCKET") {
if !bucket.is_empty() {
logging::info!("Running oss test.");
let root = uuid::Uuid::new_v4().to_string();
let accessor = oss::Builder::default()
.root(&root)
.access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?)
.access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?)
.bucket(&bucket)
.build()?;
let store = ObjectStore::new(accessor);
let mut guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
test_object_list(&store).await?;
guard.remove_all().await?;
}
}
Ok(())
}

View File

@@ -11,6 +11,7 @@ GT_S3_ACCESS_KEY_ID=S3 access key id
GT_S3_ACCESS_KEY=S3 secret access key
```
## Run
Execute the following command in the project root folder:
@@ -24,3 +25,9 @@ Test s3 storage:
```
cargo test s3
```
Test oss storage:
```
cargo test oss
```

View File

@@ -23,7 +23,9 @@ use axum::Router;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_runtime::Builder as RuntimeBuilder;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, WalConfig,
};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::{Instance, InstanceRef};
use datanode::sql::SqlHandler;
@@ -31,6 +33,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use frontend::instance::Instance as FeInstance;
use object_store::backend::s3;
use object_store::services::oss;
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use once_cell::sync::OnceCell;
@@ -57,6 +60,7 @@ fn get_port() -> usize {
pub enum StorageType {
S3,
File,
Oss,
}
impl StorageType {
@@ -72,6 +76,13 @@ impl StorageType {
false
}
}
StorageType::Oss => {
if let Ok(b) = env::var("GT_OSS_BUCKET") {
!b.is_empty()
} else {
false
}
}
}
}
}
@@ -83,29 +94,53 @@ fn get_test_store_config(
let _ = dotenv::dotenv();
match store_type {
StorageType::S3 => {
let root = uuid::Uuid::new_v4().to_string();
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let secret_key = env::var("GT_S3_ACCESS_KEY").unwrap();
let bucket = env::var("GT_S3_BUCKET").unwrap();
StorageType::Oss => {
let oss_config = OssConfig {
root: uuid::Uuid::new_v4().to_string(),
access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap(),
access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(),
bucket: env::var("GT_OSS_BUCKET").unwrap(),
endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
};
let accessor = s3::Builder::default()
.root(&root)
.access_key_id(&key_id)
.secret_access_key(&secret_key)
.bucket(&bucket)
let accessor = oss::Builder::default()
.root(&oss_config.root)
.endpoint(&oss_config.endpoint)
.access_key_id(&oss_config.access_key_id)
.access_key_secret(&oss_config.access_key_secret)
.bucket(&oss_config.bucket)
.build()
.unwrap();
let config = ObjectStoreConfig::S3 {
root,
bucket,
access_key_id: key_id,
secret_access_key: secret_key,
let config = ObjectStoreConfig::Oss(oss_config);
let store = ObjectStore::new(accessor);
(
config,
Some(TempDirGuard::Oss(TempFolder::new(&store, "/"))),
)
}
StorageType::S3 => {
let s3_config = S3Config {
root: uuid::Uuid::new_v4().to_string(),
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap(),
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap(),
bucket: env::var("GT_S3_BUCKET").unwrap(),
endpoint: None,
region: None,
};
let accessor = s3::Builder::default()
.root(&s3_config.root)
.access_key_id(&s3_config.access_key_id)
.secret_access_key(&s3_config.secret_access_key)
.bucket(&s3_config.bucket)
.build()
.unwrap();
let config = ObjectStoreConfig::S3(s3_config);
let store = ObjectStore::new(accessor);
(config, Some(TempDirGuard::S3(TempFolder::new(&store, "/"))))
@@ -114,9 +149,9 @@ fn get_test_store_config(
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
(
ObjectStoreConfig::File {
ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
}),
Some(TempDirGuard::File(data_tmp_dir)),
)
}
@@ -126,6 +161,7 @@ fn get_test_store_config(
enum TempDirGuard {
File(TempDir),
S3(TempFolder),
Oss(TempFolder),
}
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
@@ -140,6 +176,9 @@ impl TestGuard {
if let Some(TempDirGuard::S3(mut guard)) = self.data_tmp_dir.take() {
guard.remove_all().await.unwrap();
}
if let Some(TempDirGuard::Oss(mut guard)) = self.data_tmp_dir.take() {
guard.remove_all().await.unwrap();
}
}
}

View File

@@ -17,5 +17,5 @@ mod grpc;
#[macro_use]
mod http;
grpc_tests!(File, S3);
http_tests!(File, S3);
grpc_tests!(File, S3, Oss);
http_tests!(File, S3, Oss);