mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
feat: support gcs storage (#1781)
This commit is contained in:
@@ -14,4 +14,8 @@ GT_AZBLOB_CONTAINER=AZBLOB container
|
|||||||
GT_AZBLOB_ACCOUNT_NAME=AZBLOB account name
|
GT_AZBLOB_ACCOUNT_NAME=AZBLOB account name
|
||||||
GT_AZBLOB_ACCOUNT_KEY=AZBLOB account key
|
GT_AZBLOB_ACCOUNT_KEY=AZBLOB account key
|
||||||
GT_AZBLOB_ENDPOINT=AZBLOB endpoint
|
GT_AZBLOB_ENDPOINT=AZBLOB endpoint
|
||||||
|
# Settings for gcs test
|
||||||
|
GT_GCS_BUCKET = GCS bucket
|
||||||
|
GT_GCS_SCOPE = GCS scope
|
||||||
|
GT_GCS_CREDENTIAL_PATH = GCS credential path
|
||||||
|
GT_GCS_ENDPOINT = GCS end point
|
||||||
|
|||||||
@@ -273,6 +273,7 @@ mod tests {
|
|||||||
ObjectStoreConfig::S3 { .. } => unreachable!(),
|
ObjectStoreConfig::S3 { .. } => unreachable!(),
|
||||||
ObjectStoreConfig::Oss { .. } => unreachable!(),
|
ObjectStoreConfig::Oss { .. } => unreachable!(),
|
||||||
ObjectStoreConfig::Azblob { .. } => unreachable!(),
|
ObjectStoreConfig::Azblob { .. } => unreachable!(),
|
||||||
|
ObjectStoreConfig::Gcs { .. } => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ pub enum ObjectStoreConfig {
|
|||||||
S3(S3Config),
|
S3(S3Config),
|
||||||
Oss(OssConfig),
|
Oss(OssConfig),
|
||||||
Azblob(AzblobConfig),
|
Azblob(AzblobConfig),
|
||||||
|
Gcs(GcsConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Storage engine config
|
/// Storage engine config
|
||||||
@@ -122,6 +123,19 @@ pub struct AzblobConfig {
|
|||||||
pub cache_capacity: Option<ReadableSize>,
|
pub cache_capacity: Option<ReadableSize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct GcsConfig {
|
||||||
|
pub root: String,
|
||||||
|
pub bucket: String,
|
||||||
|
pub scope: String,
|
||||||
|
#[serde(skip_serializing)]
|
||||||
|
pub credential_path: SecretString,
|
||||||
|
pub endpoint: String,
|
||||||
|
pub cache_path: Option<String>,
|
||||||
|
pub cache_capacity: Option<ReadableSize>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for S3Config {
|
impl Default for S3Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -166,6 +180,20 @@ impl Default for AzblobConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for GcsConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
root: String::default(),
|
||||||
|
bucket: String::default(),
|
||||||
|
scope: String::default(),
|
||||||
|
credential_path: SecretString::from(String::default()),
|
||||||
|
endpoint: String::default(),
|
||||||
|
cache_path: Option::default(),
|
||||||
|
cache_capacity: Option::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for ObjectStoreConfig {
|
impl Default for ObjectStoreConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
ObjectStoreConfig::File(FileConfig {
|
ObjectStoreConfig::File(FileConfig {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
mod azblob;
|
mod azblob;
|
||||||
mod fs;
|
mod fs;
|
||||||
|
mod gcs;
|
||||||
mod oss;
|
mod oss;
|
||||||
mod s3;
|
mod s3;
|
||||||
|
|
||||||
@@ -40,6 +41,7 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result
|
|||||||
ObjectStoreConfig::Azblob(azblob_config) => {
|
ObjectStoreConfig::Azblob(azblob_config) => {
|
||||||
azblob::new_azblob_object_store(azblob_config).await
|
azblob::new_azblob_object_store(azblob_config).await
|
||||||
}
|
}
|
||||||
|
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
// Enable retry layer and cache layer for non-fs object storages
|
// Enable retry layer and cache layer for non-fs object storages
|
||||||
@@ -88,6 +90,13 @@ async fn create_object_store_with_cache(
|
|||||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||||
(path, capacity)
|
(path, capacity)
|
||||||
}
|
}
|
||||||
|
ObjectStoreConfig::Gcs(gcs_config) => {
|
||||||
|
let path = gcs_config.cache_path.as_ref();
|
||||||
|
let capacity = gcs_config
|
||||||
|
.cache_capacity
|
||||||
|
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||||
|
(path, capacity)
|
||||||
|
}
|
||||||
_ => (None, ReadableSize(0)),
|
_ => (None, ReadableSize(0)),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
42
src/datanode/src/store/gcs.rs
Normal file
42
src/datanode/src/store/gcs.rs
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use common_telemetry::logging::info;
|
||||||
|
use object_store::services::Gcs as GCSBuilder;
|
||||||
|
use object_store::{util, ObjectStore};
|
||||||
|
use secrecy::ExposeSecret;
|
||||||
|
use snafu::prelude::*;
|
||||||
|
|
||||||
|
use crate::datanode::GcsConfig;
|
||||||
|
use crate::error::{self, Result};
|
||||||
|
|
||||||
|
pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
|
||||||
|
let root = util::normalize_dir(&gcs_config.root);
|
||||||
|
info!(
|
||||||
|
"The gcs storage bucket is: {}, root is: {}",
|
||||||
|
gcs_config.bucket, &root
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut builder = GCSBuilder::default();
|
||||||
|
builder
|
||||||
|
.root(&root)
|
||||||
|
.bucket(&gcs_config.bucket)
|
||||||
|
.scope(&gcs_config.scope)
|
||||||
|
.credential_path(gcs_config.credential_path.expose_secret())
|
||||||
|
.endpoint(&gcs_config.endpoint);
|
||||||
|
|
||||||
|
Ok(ObjectStore::new(builder)
|
||||||
|
.context(error::InitBackendSnafu)?
|
||||||
|
.finish())
|
||||||
|
}
|
||||||
@@ -23,7 +23,7 @@ use object_store::services::{Fs, S3};
|
|||||||
use object_store::test_util::TempFolder;
|
use object_store::test_util::TempFolder;
|
||||||
use object_store::{util, ObjectStore, ObjectStoreBuilder};
|
use object_store::{util, ObjectStore, ObjectStoreBuilder};
|
||||||
use opendal::raw::Accessor;
|
use opendal::raw::Accessor;
|
||||||
use opendal::services::{Azblob, Oss};
|
use opendal::services::{Azblob, Gcs, Oss};
|
||||||
use opendal::{EntryMode, Operator, OperatorBuilder};
|
use opendal::{EntryMode, Operator, OperatorBuilder};
|
||||||
|
|
||||||
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
|
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
|
||||||
@@ -185,6 +185,32 @@ async fn test_azblob_backend() -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_gcs_backend() -> Result<()> {
|
||||||
|
logging::init_default_ut_logging();
|
||||||
|
if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") {
|
||||||
|
if !container.is_empty() {
|
||||||
|
logging::info!("Running azblob test.");
|
||||||
|
|
||||||
|
let mut builder = Gcs::default();
|
||||||
|
builder
|
||||||
|
.root(&uuid::Uuid::new_v4().to_string())
|
||||||
|
.bucket(&env::var("GT_GCS_BUCKET").unwrap())
|
||||||
|
.scope(&env::var("GT_GCS_SCOPE").unwrap())
|
||||||
|
.credential_path(&env::var("GT_GCS_CREDENTIAL_PATH").unwrap())
|
||||||
|
.endpoint(&env::var("GT_GCS_ENDPOINT").unwrap());
|
||||||
|
|
||||||
|
let store = ObjectStore::new(builder).unwrap().finish();
|
||||||
|
|
||||||
|
let guard = TempFolder::new(&store, "/");
|
||||||
|
test_object_crud(&store).await?;
|
||||||
|
test_object_list(&store).await?;
|
||||||
|
guard.remove_all().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn assert_lru_cache<C: Accessor + Clone>(
|
async fn assert_lru_cache<C: Accessor + Clone>(
|
||||||
cache_layer: &LruCacheLayer<C>,
|
cache_layer: &LruCacheLayer<C>,
|
||||||
file_names: &[&str],
|
file_names: &[&str],
|
||||||
|
|||||||
@@ -29,8 +29,8 @@ use common_runtime::Builder as RuntimeBuilder;
|
|||||||
use common_test_util::ports;
|
use common_test_util::ports;
|
||||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||||
use datanode::datanode::{
|
use datanode::datanode::{
|
||||||
AzblobConfig, DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig,
|
AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig,
|
||||||
S3Config, StorageConfig, WalConfig,
|
ProcedureConfig, S3Config, StorageConfig, WalConfig,
|
||||||
};
|
};
|
||||||
use datanode::error::{CreateTableSnafu, Result};
|
use datanode::error::{CreateTableSnafu, Result};
|
||||||
use datanode::instance::Instance;
|
use datanode::instance::Instance;
|
||||||
@@ -43,7 +43,7 @@ use datatypes::vectors::{
|
|||||||
};
|
};
|
||||||
use frontend::instance::Instance as FeInstance;
|
use frontend::instance::Instance as FeInstance;
|
||||||
use frontend::service_config::{MysqlOptions, PostgresOptions};
|
use frontend::service_config::{MysqlOptions, PostgresOptions};
|
||||||
use object_store::services::{Azblob, Oss, S3};
|
use object_store::services::{Azblob, Gcs, Oss, S3};
|
||||||
use object_store::test_util::TempFolder;
|
use object_store::test_util::TempFolder;
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use secrecy::ExposeSecret;
|
use secrecy::ExposeSecret;
|
||||||
@@ -68,6 +68,7 @@ pub enum StorageType {
|
|||||||
File,
|
File,
|
||||||
Oss,
|
Oss,
|
||||||
Azblob,
|
Azblob,
|
||||||
|
Gcs,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StorageType {
|
impl StorageType {
|
||||||
@@ -97,6 +98,13 @@ impl StorageType {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
StorageType::Gcs => {
|
||||||
|
if let Ok(b) = env::var("GT_GCS_BUCKET") {
|
||||||
|
!b.is_empty()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -119,6 +127,28 @@ pub fn get_test_store_config(
|
|||||||
let _ = dotenv::dotenv();
|
let _ = dotenv::dotenv();
|
||||||
|
|
||||||
match store_type {
|
match store_type {
|
||||||
|
StorageType::Gcs => {
|
||||||
|
let gcs_config = GcsConfig {
|
||||||
|
root: uuid::Uuid::new_v4().to_string(),
|
||||||
|
bucket: env::var("GT_GCS_BUCKET").unwrap(),
|
||||||
|
scope: env::var("GT_GCS_SCOPE").unwrap(),
|
||||||
|
credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
|
||||||
|
endpoint: env::var("GT_GCS_ENDPOINT").unwrap(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut builder = Gcs::default();
|
||||||
|
builder
|
||||||
|
.root(&gcs_config.root)
|
||||||
|
.bucket(&gcs_config.bucket)
|
||||||
|
.scope(&gcs_config.scope)
|
||||||
|
.credential_path(gcs_config.credential_path.expose_secret())
|
||||||
|
.endpoint(&gcs_config.endpoint);
|
||||||
|
|
||||||
|
let config = ObjectStoreConfig::Gcs(gcs_config);
|
||||||
|
let store = ObjectStore::new(builder).unwrap().finish();
|
||||||
|
(config, TempDirGuard::Gcs(TempFolder::new(&store, "/")))
|
||||||
|
}
|
||||||
StorageType::Azblob => {
|
StorageType::Azblob => {
|
||||||
let azblob_config = AzblobConfig {
|
let azblob_config = AzblobConfig {
|
||||||
root: uuid::Uuid::new_v4().to_string(),
|
root: uuid::Uuid::new_v4().to_string(),
|
||||||
@@ -216,6 +246,7 @@ pub enum TempDirGuard {
|
|||||||
S3(TempFolder),
|
S3(TempFolder),
|
||||||
Oss(TempFolder),
|
Oss(TempFolder),
|
||||||
Azblob(TempFolder),
|
Azblob(TempFolder),
|
||||||
|
Gcs(TempFolder),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TestGuard {
|
pub struct TestGuard {
|
||||||
@@ -229,8 +260,10 @@ pub struct StorageGuard(pub TempDirGuard);
|
|||||||
|
|
||||||
impl TestGuard {
|
impl TestGuard {
|
||||||
pub async fn remove_all(&mut self) {
|
pub async fn remove_all(&mut self) {
|
||||||
if let TempDirGuard::S3(guard) | TempDirGuard::Oss(guard) | TempDirGuard::Azblob(guard) =
|
if let TempDirGuard::S3(guard)
|
||||||
&mut self.storage_guard.0
|
| TempDirGuard::Oss(guard)
|
||||||
|
| TempDirGuard::Azblob(guard)
|
||||||
|
| TempDirGuard::Gcs(guard) = &mut self.storage_guard.0
|
||||||
{
|
{
|
||||||
guard.remove_all().await.unwrap()
|
guard.remove_all().await.unwrap()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ mod sql;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
mod region_failover;
|
mod region_failover;
|
||||||
|
|
||||||
grpc_tests!(File, S3, S3WithCache, Oss, Azblob);
|
grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
|
||||||
http_tests!(File, S3, S3WithCache, Oss, Azblob);
|
http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
|
||||||
region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
|
region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
|
||||||
sql_tests!(File);
|
sql_tests!(File);
|
||||||
|
|||||||
Reference in New Issue
Block a user