mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: support azblob storage. (#1659)
* feat:support azblob storage. * test:add some tests. * refactor:use if-let.
This commit is contained in:
@@ -9,3 +9,9 @@ GT_OSS_BUCKET=OSS bucket
|
||||
GT_OSS_ACCESS_KEY_ID=OSS access key id
|
||||
GT_OSS_ACCESS_KEY=OSS access key
|
||||
GT_OSS_ENDPOINT=OSS endpoint
|
||||
# Settings for azblob test
|
||||
GT_AZBLOB_CONTAINER=AZBLOB container
|
||||
GT_AZBLOB_ACCOUNT_NAME=AZBLOB account name
|
||||
GT_AZBLOB_ACCOUNT_KEY=AZBLOB account key
|
||||
GT_AZBLOB_ENDPOINT=AZBLOB endpoint
|
||||
|
||||
|
||||
@@ -280,6 +280,7 @@ mod tests {
|
||||
}
|
||||
ObjectStoreConfig::S3 { .. } => unreachable!(),
|
||||
ObjectStoreConfig::Oss { .. } => unreachable!(),
|
||||
ObjectStoreConfig::Azblob { .. } => unreachable!(),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -47,6 +47,7 @@ pub enum ObjectStoreConfig {
|
||||
File(FileConfig),
|
||||
S3(S3Config),
|
||||
Oss(OssConfig),
|
||||
Azblob(AzblobConfig),
|
||||
}
|
||||
|
||||
/// Storage engine config
|
||||
@@ -95,6 +96,21 @@ pub struct OssConfig {
|
||||
pub cache_capacity: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct AzblobConfig {
|
||||
pub container: String,
|
||||
pub root: String,
|
||||
#[serde(skip_serializing)]
|
||||
pub account_name: SecretString,
|
||||
#[serde(skip_serializing)]
|
||||
pub account_key: SecretString,
|
||||
pub endpoint: String,
|
||||
pub sas_token: Option<String>,
|
||||
pub cache_path: Option<String>,
|
||||
pub cache_capacity: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
impl Default for S3Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -124,6 +140,21 @@ impl Default for OssConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AzblobConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
container: String::default(),
|
||||
root: String::default(),
|
||||
account_name: SecretString::from(String::default()),
|
||||
account_key: SecretString::from(String::default()),
|
||||
endpoint: String::default(),
|
||||
cache_path: Option::default(),
|
||||
cache_capacity: Option::default(),
|
||||
sas_token: Option::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ObjectStoreConfig {
|
||||
fn default() -> Self {
|
||||
ObjectStoreConfig::File(FileConfig {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! object storage utilities
|
||||
|
||||
mod azblob;
|
||||
mod fs;
|
||||
mod oss;
|
||||
mod s3;
|
||||
@@ -36,6 +37,9 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result
|
||||
ObjectStoreConfig::File(file_config) => fs::new_fs_object_store(file_config).await,
|
||||
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
|
||||
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
|
||||
ObjectStoreConfig::Azblob(azblob_config) => {
|
||||
azblob::new_azblob_object_store(azblob_config).await
|
||||
}
|
||||
}?;
|
||||
|
||||
// Enable retry layer and cache layer for non-fs object storages
|
||||
@@ -76,6 +80,13 @@ async fn create_object_store_with_cache(
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Azblob(azblob_config) => {
|
||||
let path = azblob_config.cache_path.as_ref();
|
||||
let capacity = azblob_config
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(path, capacity)
|
||||
}
|
||||
_ => (None, ReadableSize(0)),
|
||||
};
|
||||
|
||||
|
||||
47
src/datanode/src/store/azblob.rs
Normal file
47
src/datanode/src/store/azblob.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_telemetry::logging::info;
|
||||
use object_store::services::Azblob as AzureBuilder;
|
||||
use object_store::{util, ObjectStore};
|
||||
use secrecy::ExposeSecret;
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::datanode::AzblobConfig;
|
||||
use crate::error::{self, Result};
|
||||
|
||||
pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
|
||||
let root = util::normalize_dir(&azblob_config.root);
|
||||
|
||||
info!(
|
||||
"The azure storage container is: {}, root is: {}",
|
||||
azblob_config.container, &root
|
||||
);
|
||||
|
||||
let mut builder = AzureBuilder::default();
|
||||
builder
|
||||
.root(&root)
|
||||
.container(&azblob_config.container)
|
||||
.endpoint(&azblob_config.endpoint)
|
||||
.account_name(azblob_config.account_name.expose_secret())
|
||||
.account_key(azblob_config.account_key.expose_secret());
|
||||
|
||||
if let Some(token) = &azblob_config.sas_token {
|
||||
builder.sas_token(token);
|
||||
}
|
||||
|
||||
Ok(ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.finish())
|
||||
}
|
||||
@@ -23,7 +23,7 @@ use object_store::services::{Fs, S3};
|
||||
use object_store::test_util::TempFolder;
|
||||
use object_store::{util, ObjectStore, ObjectStoreBuilder};
|
||||
use opendal::raw::Accessor;
|
||||
use opendal::services::Oss;
|
||||
use opendal::services::{Azblob, Oss};
|
||||
use opendal::{EntryMode, Operator, OperatorBuilder};
|
||||
|
||||
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
|
||||
@@ -158,6 +158,33 @@ async fn test_oss_backend() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_azblob_backend() -> Result<()> {
|
||||
logging::init_default_ut_logging();
|
||||
if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") {
|
||||
if !container.is_empty() {
|
||||
logging::info!("Running azblob test.");
|
||||
|
||||
let root = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
let mut builder = Azblob::default();
|
||||
builder
|
||||
.root(&root)
|
||||
.account_name(&env::var("GT_AZBLOB_ACCOUNT_NAME")?)
|
||||
.account_key(&env::var("GT_AZBLOB_ACCOUNT_KEY")?)
|
||||
.container(&container);
|
||||
|
||||
let store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
let mut guard = TempFolder::new(&store, "/");
|
||||
test_object_crud(&store).await?;
|
||||
test_object_list(&store).await?;
|
||||
guard.remove_all().await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_lru_cache<C: Accessor + Clone>(
|
||||
cache_layer: &LruCacheLayer<C>,
|
||||
file_names: &[&str],
|
||||
|
||||
@@ -32,3 +32,9 @@ Test oss storage:
|
||||
```
|
||||
cargo test oss
|
||||
```
|
||||
|
||||
Test azblob storage:
|
||||
|
||||
```
|
||||
cargo test azblob
|
||||
```
|
||||
@@ -26,8 +26,8 @@ use common_runtime::Builder as RuntimeBuilder;
|
||||
use common_test_util::ports;
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
use datanode::datanode::{
|
||||
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, S3Config,
|
||||
StorageConfig, WalConfig,
|
||||
AzblobConfig, DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig,
|
||||
S3Config, StorageConfig, WalConfig,
|
||||
};
|
||||
use datanode::error::{CreateTableSnafu, Result};
|
||||
use datanode::instance::Instance;
|
||||
@@ -35,7 +35,7 @@ use datanode::sql::SqlHandler;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use frontend::instance::Instance as FeInstance;
|
||||
use object_store::services::{Oss, S3};
|
||||
use object_store::services::{Azblob, Oss, S3};
|
||||
use object_store::test_util::TempFolder;
|
||||
use object_store::ObjectStore;
|
||||
use secrecy::ExposeSecret;
|
||||
@@ -57,6 +57,7 @@ pub enum StorageType {
|
||||
S3WithCache,
|
||||
File,
|
||||
Oss,
|
||||
Azblob,
|
||||
}
|
||||
|
||||
impl StorageType {
|
||||
@@ -79,6 +80,13 @@ impl StorageType {
|
||||
false
|
||||
}
|
||||
}
|
||||
StorageType::Azblob => {
|
||||
if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") {
|
||||
!b.is_empty()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -101,6 +109,34 @@ pub fn get_test_store_config(
|
||||
let _ = dotenv::dotenv();
|
||||
|
||||
match store_type {
|
||||
StorageType::Azblob => {
|
||||
let azblob_config = AzblobConfig {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
container: env::var("GT_AZBLOB_CONTAINER").unwrap(),
|
||||
account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(),
|
||||
account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(),
|
||||
endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut builder = Azblob::default();
|
||||
builder
|
||||
.root(&azblob_config.root)
|
||||
.endpoint(&azblob_config.endpoint)
|
||||
.account_name(azblob_config.account_name.expose_secret())
|
||||
.account_key(azblob_config.account_key.expose_secret())
|
||||
.container(&azblob_config.container);
|
||||
|
||||
if let Ok(sas_token) = env::var("GT_AZBLOB_SAS_TOKEN") {
|
||||
builder.sas_token(&sas_token);
|
||||
}
|
||||
|
||||
let config = ObjectStoreConfig::Azblob(azblob_config);
|
||||
|
||||
let store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
(config, TempDirGuard::Azblob(TempFolder::new(&store, "/")))
|
||||
}
|
||||
StorageType::Oss => {
|
||||
let oss_config = OssConfig {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
@@ -169,6 +205,7 @@ pub enum TempDirGuard {
|
||||
File(TempDir),
|
||||
S3(TempFolder),
|
||||
Oss(TempFolder),
|
||||
Azblob(TempFolder),
|
||||
}
|
||||
|
||||
pub struct TestGuard {
|
||||
@@ -182,7 +219,9 @@ pub struct StorageGuard(pub TempDirGuard);
|
||||
|
||||
impl TestGuard {
|
||||
pub async fn remove_all(&mut self) {
|
||||
if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) = &mut self.storage_guard.0 {
|
||||
if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) | TempDirGuard::Azblob(guard) =
|
||||
&mut self.storage_guard.0
|
||||
{
|
||||
guard.remove_all().await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,5 +17,5 @@ mod grpc;
|
||||
#[macro_use]
|
||||
mod http;
|
||||
|
||||
grpc_tests!(File, S3, S3WithCache, Oss);
|
||||
http_tests!(File, S3, S3WithCache, Oss);
|
||||
grpc_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||
http_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||
|
||||
Reference in New Issue
Block a user