mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
refactor(cli): refactor object storage config (#7009)
* refactor: refactor object storage config Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: public common config Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1800,6 +1800,7 @@ dependencies = [
|
||||
"nu-ansi-term",
|
||||
"object-store",
|
||||
"operator",
|
||||
"paste",
|
||||
"query",
|
||||
"rand 0.9.1",
|
||||
"reqwest",
|
||||
|
||||
@@ -51,6 +51,7 @@ meta-srv.workspace = true
|
||||
nu-ansi-term = "0.46"
|
||||
object-store.workspace = true
|
||||
operator.workspace = true
|
||||
paste.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
|
||||
19
src/cli/src/common.rs
Normal file
19
src/cli/src/common.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod object_store;
|
||||
mod store;
|
||||
|
||||
pub use object_store::ObjectStoreConfig;
|
||||
pub use store::StoreConfig;
|
||||
230
src/cli/src/common/object_store.rs
Normal file
230
src/cli/src/common/object_store.rs
Normal file
@@ -0,0 +1,230 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::secrets::SecretString;
|
||||
use common_error::ext::BoxedError;
|
||||
use object_store::config::FileConfig;
|
||||
use object_store::services::{Azblob, Gcs, Oss, S3};
|
||||
use object_store::util::{with_instrument_layers, with_retry_layers};
|
||||
use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
|
||||
use paste::paste;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self};
|
||||
|
||||
macro_rules! wrap_with_clap_prefix {
|
||||
(
|
||||
$new_name:ident, $prefix:literal, $base:ty, {
|
||||
$( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
|
||||
}
|
||||
) => {
|
||||
paste!{
|
||||
#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
|
||||
pub struct $new_name {
|
||||
$(
|
||||
$( #[doc = $doc] )?
|
||||
$( #[clap(alias = $alias)] )?
|
||||
#[clap(long $(, default_value_t = $default )? )]
|
||||
[<$prefix $field>]: $type,
|
||||
)*
|
||||
}
|
||||
|
||||
impl From<$new_name> for $base {
|
||||
fn from(w: $new_name) -> Self {
|
||||
Self {
|
||||
$( $field: w.[<$prefix $field>] ),*
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
wrap_with_clap_prefix! {
|
||||
PrefixedAzblobConnection,
|
||||
"azblob-",
|
||||
AzblobConnection,
|
||||
{
|
||||
#[doc = "The container of the object store."]
|
||||
container: String = Default::default(),
|
||||
#[doc = "The root of the object store."]
|
||||
root: String = Default::default(),
|
||||
#[doc = "The account name of the object store."]
|
||||
account_name: SecretString = Default::default(),
|
||||
#[doc = "The account key of the object store."]
|
||||
account_key: SecretString = Default::default(),
|
||||
#[doc = "The endpoint of the object store."]
|
||||
endpoint: String = Default::default(),
|
||||
#[doc = "The SAS token of the object store."]
|
||||
sas_token: Option<String>,
|
||||
}
|
||||
}
|
||||
|
||||
wrap_with_clap_prefix! {
|
||||
PrefixedS3Connection,
|
||||
"s3-",
|
||||
S3Connection,
|
||||
{
|
||||
#[doc = "The bucket of the object store."]
|
||||
bucket: String = Default::default(),
|
||||
#[doc = "The root of the object store."]
|
||||
root: String = Default::default(),
|
||||
#[doc = "The access key ID of the object store."]
|
||||
access_key_id: SecretString = Default::default(),
|
||||
#[doc = "The secret access key of the object store."]
|
||||
secret_access_key: SecretString = Default::default(),
|
||||
#[doc = "The endpoint of the object store."]
|
||||
endpoint: Option<String>,
|
||||
#[doc = "The region of the object store."]
|
||||
region: Option<String>,
|
||||
#[doc = "Enable virtual host style for the object store."]
|
||||
enable_virtual_host_style: bool = Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
wrap_with_clap_prefix! {
|
||||
PrefixedOssConnection,
|
||||
"oss-",
|
||||
OssConnection,
|
||||
{
|
||||
#[doc = "The bucket of the object store."]
|
||||
bucket: String = Default::default(),
|
||||
#[doc = "The root of the object store."]
|
||||
root: String = Default::default(),
|
||||
#[doc = "The access key ID of the object store."]
|
||||
access_key_id: SecretString = Default::default(),
|
||||
#[doc = "The access key secret of the object store."]
|
||||
access_key_secret: SecretString = Default::default(),
|
||||
#[doc = "The endpoint of the object store."]
|
||||
endpoint: String = Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
wrap_with_clap_prefix! {
|
||||
PrefixedGcsConnection,
|
||||
"gcs-",
|
||||
GcsConnection,
|
||||
{
|
||||
#[doc = "The root of the object store."]
|
||||
root: String = Default::default(),
|
||||
#[doc = "The bucket of the object store."]
|
||||
bucket: String = Default::default(),
|
||||
#[doc = "The scope of the object store."]
|
||||
scope: String = Default::default(),
|
||||
#[doc = "The credential path of the object store."]
|
||||
credential_path: SecretString = Default::default(),
|
||||
#[doc = "The credential of the object store."]
|
||||
credential: SecretString = Default::default(),
|
||||
#[doc = "The endpoint of the object store."]
|
||||
endpoint: String = Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// common config for object store.
|
||||
#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
|
||||
pub struct ObjectStoreConfig {
|
||||
/// Whether to use S3 object store.
|
||||
#[clap(long, alias = "s3")]
|
||||
pub enable_s3: bool,
|
||||
|
||||
#[clap(flatten)]
|
||||
pub s3: PrefixedS3Connection,
|
||||
|
||||
/// Whether to use OSS.
|
||||
#[clap(long, alias = "oss")]
|
||||
pub enable_oss: bool,
|
||||
|
||||
#[clap(flatten)]
|
||||
pub oss: PrefixedOssConnection,
|
||||
|
||||
/// Whether to use GCS.
|
||||
#[clap(long, alias = "gcs")]
|
||||
pub enable_gcs: bool,
|
||||
|
||||
#[clap(flatten)]
|
||||
pub gcs: PrefixedGcsConnection,
|
||||
|
||||
/// Whether to use Azure Blob.
|
||||
#[clap(long, alias = "azblob")]
|
||||
pub enable_azblob: bool,
|
||||
|
||||
#[clap(flatten)]
|
||||
pub azblob: PrefixedAzblobConnection,
|
||||
}
|
||||
|
||||
/// Creates a new file system object store.
|
||||
pub fn new_fs_object_store_in_current_dir() -> std::result::Result<ObjectStore, BoxedError> {
|
||||
let data_home = ".";
|
||||
let object_store =
|
||||
object_store::factory::new_fs_object_store(data_home, &FileConfig::default())
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
Ok(with_instrument_layers(object_store, false))
|
||||
}
|
||||
|
||||
impl ObjectStoreConfig {
|
||||
/// Builds the object store from the config.
|
||||
pub fn build(&self) -> Result<ObjectStore, BoxedError> {
|
||||
let object_store = if self.enable_s3 {
|
||||
let s3 = S3Connection::from(self.s3.clone());
|
||||
common_telemetry::info!("Building object store with s3: {:?}", s3);
|
||||
Some(
|
||||
ObjectStore::new(S3::from(&s3))
|
||||
.context(error::InitBackendSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
)
|
||||
} else if self.enable_oss {
|
||||
let oss = OssConnection::from(self.oss.clone());
|
||||
common_telemetry::info!("Building object store with oss: {:?}", oss);
|
||||
Some(
|
||||
ObjectStore::new(Oss::from(&oss))
|
||||
.context(error::InitBackendSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
)
|
||||
} else if self.enable_gcs {
|
||||
let gcs = GcsConnection::from(self.gcs.clone());
|
||||
common_telemetry::info!("Building object store with gcs: {:?}", gcs);
|
||||
Some(
|
||||
ObjectStore::new(Gcs::from(&gcs))
|
||||
.context(error::InitBackendSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
)
|
||||
} else if self.enable_azblob {
|
||||
let azblob = AzblobConnection::from(self.azblob.clone());
|
||||
common_telemetry::info!("Building object store with azblob: {:?}", azblob);
|
||||
Some(
|
||||
ObjectStore::new(Azblob::from(&azblob))
|
||||
.context(error::InitBackendSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let object_store = object_store
|
||||
.map(|object_store| with_instrument_layers(with_retry_layers(object_store), false));
|
||||
|
||||
match object_store {
|
||||
Some(object_store) => Ok(object_store),
|
||||
None => Ok(with_instrument_layers(
|
||||
new_fs_object_store_in_current_dir()?,
|
||||
false,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,7 @@ use servers::tls::{TlsMode, TlsOption};
|
||||
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub(crate) struct StoreConfig {
|
||||
pub struct StoreConfig {
|
||||
/// The endpoint of store. one of etcd, postgres or mysql.
|
||||
///
|
||||
/// For postgres store, the format is:
|
||||
@@ -38,48 +38,48 @@ pub(crate) struct StoreConfig {
|
||||
/// For mysql store, the format is:
|
||||
/// "mysql://user:password@ip:port/dbname"
|
||||
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
|
||||
store_addrs: Vec<String>,
|
||||
pub store_addrs: Vec<String>,
|
||||
|
||||
/// The maximum number of operations in a transaction. Only used when using [etcd-store].
|
||||
#[clap(long, default_value = "128")]
|
||||
max_txn_ops: usize,
|
||||
pub max_txn_ops: usize,
|
||||
|
||||
/// The metadata store backend.
|
||||
#[clap(long, value_enum, default_value = "etcd-store")]
|
||||
backend: BackendImpl,
|
||||
pub backend: BackendImpl,
|
||||
|
||||
/// The key prefix of the metadata store.
|
||||
#[clap(long, default_value = "")]
|
||||
store_key_prefix: String,
|
||||
pub store_key_prefix: String,
|
||||
|
||||
/// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
|
||||
meta_table_name: String,
|
||||
pub meta_table_name: String,
|
||||
|
||||
/// Optional PostgreSQL schema for metadata table (defaults to current search_path if unset).
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[clap(long)]
|
||||
meta_schema_name: Option<String>,
|
||||
pub meta_schema_name: Option<String>,
|
||||
/// TLS mode for backend store connections (etcd, PostgreSQL, MySQL)
|
||||
#[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
|
||||
backend_tls_mode: TlsMode,
|
||||
pub backend_tls_mode: TlsMode,
|
||||
|
||||
/// Path to TLS certificate file for backend store connections
|
||||
#[clap(long = "backend-tls-cert-path", default_value = "")]
|
||||
backend_tls_cert_path: String,
|
||||
pub backend_tls_cert_path: String,
|
||||
|
||||
/// Path to TLS private key file for backend store connections
|
||||
#[clap(long = "backend-tls-key-path", default_value = "")]
|
||||
backend_tls_key_path: String,
|
||||
pub backend_tls_key_path: String,
|
||||
|
||||
/// Path to TLS CA certificate file for backend store connections
|
||||
#[clap(long = "backend-tls-ca-cert-path", default_value = "")]
|
||||
backend_tls_ca_cert_path: String,
|
||||
pub backend_tls_ca_cert_path: String,
|
||||
|
||||
/// Enable watching TLS certificate files for changes
|
||||
#[clap(long = "backend-tls-watch")]
|
||||
backend_tls_watch: bool,
|
||||
pub backend_tls_watch: bool,
|
||||
}
|
||||
|
||||
impl StoreConfig {
|
||||
@@ -104,6 +104,11 @@ impl StoreConfig {
|
||||
if store_addrs.is_empty() {
|
||||
EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
|
||||
} else {
|
||||
common_telemetry::info!(
|
||||
"Building kvbackend with store addrs: {:?}, backend: {:?}",
|
||||
store_addrs,
|
||||
self.backend
|
||||
);
|
||||
let kvbackend = match self.backend {
|
||||
BackendImpl::EtcdStore => {
|
||||
let tls_config = self.tls_config();
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#![allow(clippy::print_stdout)]
|
||||
mod bench;
|
||||
mod common;
|
||||
mod data;
|
||||
mod database;
|
||||
pub mod error;
|
||||
@@ -21,6 +22,7 @@ mod metadata;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
pub use common::{ObjectStoreConfig, StoreConfig};
|
||||
use common_error::ext::BoxedError;
|
||||
pub use database::DatabaseClient;
|
||||
use error::Result;
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
mod control;
|
||||
mod repair;
|
||||
mod snapshot;
|
||||
|
||||
@@ -20,7 +20,7 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
|
||||
use crate::Tool;
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::common::StoreConfig;
|
||||
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
|
||||
|
||||
/// Delete key-value pairs logically from the metadata store.
|
||||
|
||||
@@ -24,8 +24,8 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use store_api::storage::TableId;
|
||||
|
||||
use crate::Tool;
|
||||
use crate::common::StoreConfig;
|
||||
use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
|
||||
use crate::metadata::control::utils::get_table_id_by_name;
|
||||
|
||||
@@ -48,6 +48,7 @@ pub struct DelTableCommand {
|
||||
#[clap(long, default_value = DEFAULT_CATALOG_NAME)]
|
||||
catalog_name: String,
|
||||
|
||||
/// The store config.
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
}
|
||||
|
||||
@@ -28,8 +28,8 @@ use common_meta::rpc::store::RangeRequest;
|
||||
use futures::TryStreamExt;
|
||||
|
||||
use crate::Tool;
|
||||
use crate::common::StoreConfig;
|
||||
use crate::error::InvalidArgumentsSnafu;
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
|
||||
|
||||
/// Getting metadata from metadata store.
|
||||
|
||||
@@ -38,10 +38,10 @@ use snafu::{ResultExt, ensure};
|
||||
use store_api::storage::TableId;
|
||||
|
||||
use crate::Tool;
|
||||
use crate::common::StoreConfig;
|
||||
use crate::error::{
|
||||
InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
|
||||
|
||||
/// Repair metadata of logical tables.
|
||||
|
||||
@@ -16,16 +16,14 @@ use std::path::Path;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use object_store::ObjectStore;
|
||||
use object_store::services::{Fs, S3};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::Tool;
|
||||
use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
|
||||
use crate::metadata::common::StoreConfig;
|
||||
use crate::common::{ObjectStoreConfig, StoreConfig};
|
||||
use crate::error::UnexpectedSnafu;
|
||||
|
||||
/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
|
||||
#[derive(Subcommand)]
|
||||
@@ -48,65 +46,6 @@ impl SnapshotCommand {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore
|
||||
#[derive(Debug, Default, Parser)]
|
||||
struct S3Config {
|
||||
/// whether to use s3 as the output directory. default is false.
|
||||
#[clap(long, default_value = "false")]
|
||||
s3: bool,
|
||||
/// The s3 bucket name.
|
||||
#[clap(long)]
|
||||
s3_bucket: Option<String>,
|
||||
/// The s3 region.
|
||||
#[clap(long)]
|
||||
s3_region: Option<String>,
|
||||
/// The s3 access key.
|
||||
#[clap(long)]
|
||||
s3_access_key: Option<SecretString>,
|
||||
/// The s3 secret key.
|
||||
#[clap(long)]
|
||||
s3_secret_key: Option<SecretString>,
|
||||
/// The s3 endpoint. we will automatically use the default s3 decided by the region if not set.
|
||||
#[clap(long)]
|
||||
s3_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
impl S3Config {
|
||||
pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
|
||||
if !self.s3 {
|
||||
Ok(None)
|
||||
} else {
|
||||
if self.s3_region.is_none()
|
||||
|| self.s3_access_key.is_none()
|
||||
|| self.s3_secret_key.is_none()
|
||||
|| self.s3_bucket.is_none()
|
||||
{
|
||||
return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
|
||||
}
|
||||
// Safety, unwrap is safe because we have checked the options above.
|
||||
let mut config = S3::default()
|
||||
.bucket(self.s3_bucket.as_ref().unwrap())
|
||||
.region(self.s3_region.as_ref().unwrap())
|
||||
.access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
|
||||
.secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
|
||||
|
||||
if !root.is_empty() && root != "." {
|
||||
config = config.root(root);
|
||||
}
|
||||
|
||||
if let Some(endpoint) = &self.s3_endpoint {
|
||||
config = config.endpoint(endpoint);
|
||||
}
|
||||
Ok(Some(
|
||||
ObjectStore::new(config)
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Export metadata snapshot tool.
|
||||
/// This tool is used to export metadata snapshot from etcd, pg or mysql.
|
||||
/// It will dump the metadata snapshot to local file or s3 bucket.
|
||||
@@ -116,60 +55,41 @@ pub struct SaveCommand {
|
||||
/// The store configuration.
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
/// The s3 config.
|
||||
/// The object store configuration.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
object_store: ObjectStoreConfig,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
/// if target output is s3 bucket, this is the root directory in the bucket.
|
||||
/// if target output is local file, this is the local directory.
|
||||
#[clap(long, default_value = "")]
|
||||
output_dir: String,
|
||||
}
|
||||
|
||||
fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
|
||||
let root = if root.is_empty() { "." } else { root };
|
||||
let object_store = ObjectStore::new(Fs::default().root(root))
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.finish();
|
||||
Ok(object_store)
|
||||
#[clap(long, default_value = "", alias = "output_dir")]
|
||||
dir: String,
|
||||
}
|
||||
|
||||
impl SaveCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.store.build().await?;
|
||||
let output_dir = &self.output_dir;
|
||||
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, store),
|
||||
target_file: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let object_store = create_local_file_object_store(output_dir)?;
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
target_file: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
let object_store = self.object_store.build().map_err(BoxedError::new)?;
|
||||
let tool = MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
path: self.dir.clone(),
|
||||
file_name: self.file_name.clone(),
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
|
||||
struct MetaSnapshotTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
target_file: String,
|
||||
path: String,
|
||||
file_name: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MetaSnapshotTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
self.inner
|
||||
.dump("", &self.target_file)
|
||||
.dump(&self.path, &self.file_name)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
@@ -186,15 +106,15 @@ pub struct RestoreCommand {
|
||||
/// The store configuration.
|
||||
#[clap(flatten)]
|
||||
store: StoreConfig,
|
||||
/// The s3 config.
|
||||
/// The object store config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
object_store: ObjectStoreConfig,
|
||||
/// The name of the target snapshot file.
|
||||
#[clap(long, default_value = "metadata_snapshot.metadata.fb")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
#[clap(long, default_value = ".")]
|
||||
input_dir: String,
|
||||
#[clap(long, default_value = ".", alias = "input_dir")]
|
||||
dir: String,
|
||||
#[clap(long, default_value = "false")]
|
||||
force: bool,
|
||||
}
|
||||
@@ -202,38 +122,39 @@ pub struct RestoreCommand {
|
||||
impl RestoreCommand {
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let kvbackend = self.store.build().await?;
|
||||
let input_dir = &self.input_dir;
|
||||
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, store),
|
||||
self.file_name.clone(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let object_store = create_local_file_object_store(input_dir)?;
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
self.file_name.clone(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
let input_dir = &self.dir;
|
||||
let file_path = Path::new(input_dir).join(&self.file_name);
|
||||
let file_path = file_path
|
||||
.to_str()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
msg: format!(
|
||||
"Invalid file path, input dir: {}, file name: {}",
|
||||
input_dir, &self.file_name
|
||||
),
|
||||
})
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
let object_store = self.object_store.build().map_err(BoxedError::new)?;
|
||||
let tool = MetaRestoreTool::new(
|
||||
MetadataSnapshotManager::new(kvbackend, object_store),
|
||||
file_path.to_string(),
|
||||
self.force,
|
||||
);
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
|
||||
struct MetaRestoreTool {
|
||||
inner: MetadataSnapshotManager,
|
||||
source_file: String,
|
||||
file_path: String,
|
||||
force: bool,
|
||||
}
|
||||
|
||||
impl MetaRestoreTool {
|
||||
pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
|
||||
pub fn new(inner: MetadataSnapshotManager, file_path: String, force: bool) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
source_file,
|
||||
file_path,
|
||||
force,
|
||||
}
|
||||
}
|
||||
@@ -252,7 +173,7 @@ impl Tool for MetaRestoreTool {
|
||||
"The target source is clean, we will restore the metadata snapshot."
|
||||
);
|
||||
self.inner
|
||||
.restore(&self.source_file)
|
||||
.restore(&self.file_path)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
@@ -266,7 +187,7 @@ impl Tool for MetaRestoreTool {
|
||||
"The target source is not clean, We will restore the metadata snapshot with --force."
|
||||
);
|
||||
self.inner
|
||||
.restore(&self.source_file)
|
||||
.restore(&self.file_path)
|
||||
.await
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(())
|
||||
@@ -280,12 +201,15 @@ impl Tool for MetaRestoreTool {
|
||||
/// It prints the filtered metadata to the console.
|
||||
#[derive(Debug, Default, Parser)]
|
||||
pub struct InfoCommand {
|
||||
/// The s3 config.
|
||||
/// The object store config.
|
||||
#[clap(flatten)]
|
||||
s3_config: S3Config,
|
||||
object_store: ObjectStoreConfig,
|
||||
/// The name of the target snapshot file. we will add the file extension automatically.
|
||||
#[clap(long, default_value = "metadata_snapshot")]
|
||||
file_name: String,
|
||||
/// The directory to store the snapshot file.
|
||||
#[clap(long, default_value = ".", alias = "input_dir")]
|
||||
dir: String,
|
||||
/// The query string to filter the metadata.
|
||||
#[clap(long, default_value = "*")]
|
||||
inspect_key: String,
|
||||
@@ -296,7 +220,7 @@ pub struct InfoCommand {
|
||||
|
||||
struct MetaInfoTool {
|
||||
inner: ObjectStore,
|
||||
source_file: String,
|
||||
file_path: String,
|
||||
inspect_key: String,
|
||||
limit: Option<usize>,
|
||||
}
|
||||
@@ -306,7 +230,7 @@ impl Tool for MetaInfoTool {
|
||||
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
|
||||
let result = MetadataSnapshotManager::info(
|
||||
&self.inner,
|
||||
&self.source_file,
|
||||
&self.file_path,
|
||||
&self.inspect_key,
|
||||
self.limit,
|
||||
)
|
||||
@@ -320,45 +244,24 @@ impl Tool for MetaInfoTool {
|
||||
}
|
||||
|
||||
impl InfoCommand {
|
||||
fn decide_object_store_root_for_local_store(
|
||||
file_path: &str,
|
||||
) -> Result<(&str, &str), BoxedError> {
|
||||
let path = Path::new(file_path);
|
||||
let parent = path
|
||||
.parent()
|
||||
.and_then(|p| p.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|f| f.to_str())
|
||||
.context(InvalidFilePathSnafu { msg: file_path })
|
||||
.map_err(BoxedError::new)?;
|
||||
let root = if parent.is_empty() { "." } else { parent };
|
||||
Ok((root, file_name))
|
||||
}
|
||||
|
||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||
let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
|
||||
if let Some(store) = object_store {
|
||||
let tool = MetaInfoTool {
|
||||
inner: store,
|
||||
source_file: self.file_name.clone(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
} else {
|
||||
let (root, file_name) =
|
||||
Self::decide_object_store_root_for_local_store(&self.file_name)?;
|
||||
let object_store = create_local_file_object_store(root)?;
|
||||
let tool = MetaInfoTool {
|
||||
inner: object_store,
|
||||
source_file: file_name.to_string(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
let object_store = self.object_store.build().map_err(BoxedError::new)?;
|
||||
let file_path = Path::new(&self.dir).join(&self.file_name);
|
||||
let file_path = file_path
|
||||
.to_str()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
msg: format!(
|
||||
"Invalid file path, input dir: {}, file name: {}",
|
||||
&self.dir, &self.file_name
|
||||
),
|
||||
})
|
||||
.map_err(BoxedError::new)?;
|
||||
let tool = MetaInfoTool {
|
||||
inner: object_store,
|
||||
file_path: file_path.to_string(),
|
||||
inspect_key: self.inspect_key.clone(),
|
||||
limit: self.limit,
|
||||
};
|
||||
Ok(Box::new(tool))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -740,7 +740,7 @@ mod tests {
|
||||
object_store::config::ObjectStoreConfig::S3(s3_config) => {
|
||||
assert_eq!(
|
||||
"SecretBox<alloc::string::String>([REDACTED])".to_string(),
|
||||
format!("{:?}", s3_config.access_key_id)
|
||||
format!("{:?}", s3_config.connection.access_key_id)
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -31,7 +31,7 @@
|
||||
//! types of `SecretBox<T>` to be serializable with `serde`, you will need to impl
|
||||
//! the [`SerializableSecret`] marker trait on `T`
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::{any, fmt};
|
||||
|
||||
use serde::{Deserialize, Serialize, de, ser};
|
||||
@@ -46,6 +46,12 @@ impl From<String> for SecretString {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for SecretString {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "SecretString([REDACTED])")
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper type for values that contains secrets.
|
||||
///
|
||||
/// It attempts to limit accidental exposure and ensure secrets are wiped from memory when dropped.
|
||||
@@ -165,6 +171,15 @@ impl<S: Zeroize> ExposeSecretMut<S> for SecretBox<S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> PartialEq for SecretBox<S>
|
||||
where
|
||||
S: PartialEq + Zeroize,
|
||||
{
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.inner_secret == other.inner_secret
|
||||
}
|
||||
}
|
||||
|
||||
/// Expose a reference to an inner secret
|
||||
pub trait ExposeSecret<S> {
|
||||
/// Expose secret: this is the only method providing access to a secret.
|
||||
|
||||
@@ -213,7 +213,7 @@ mod tests {
|
||||
// Check the configs from environment variables.
|
||||
match &opts.storage.store {
|
||||
object_store::config::ObjectStoreConfig::S3(s3_config) => {
|
||||
assert_eq!(s3_config.bucket, "mybucket".to_string());
|
||||
assert_eq!(s3_config.connection.bucket, "mybucket".to_string());
|
||||
}
|
||||
_ => panic!("unexpected store type"),
|
||||
}
|
||||
|
||||
@@ -244,9 +244,11 @@ impl MetadataSnapshotManager {
|
||||
let file_path_buf = [path, filename.to_string().as_str()]
|
||||
.iter()
|
||||
.collect::<PathBuf>();
|
||||
let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
|
||||
reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
|
||||
})?;
|
||||
let file_path = file_path_buf
|
||||
.to_str()
|
||||
.with_context(|| InvalidFileNameSnafu {
|
||||
reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
|
||||
})?;
|
||||
// Ensure the file does not exist
|
||||
ensure!(
|
||||
!self
|
||||
|
||||
@@ -188,9 +188,12 @@ mod tests {
|
||||
ObjectStoreConfig::S3(cfg) => {
|
||||
assert_eq!(
|
||||
"SecretBox<alloc::string::String>([REDACTED])".to_string(),
|
||||
format!("{:?}", cfg.access_key_id)
|
||||
format!("{:?}", cfg.connection.access_key_id)
|
||||
);
|
||||
assert_eq!(
|
||||
"access_key_id",
|
||||
cfg.connection.access_key_id.expose_secret()
|
||||
);
|
||||
assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
@@ -19,9 +19,9 @@ use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use object_store::factory::new_raw_object_store;
|
||||
use object_store::layers::{LruCacheLayer, RetryLayer};
|
||||
use object_store::layers::LruCacheLayer;
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{PrintDetailedError, clean_temp_dir, join_dir, with_instrument_layers};
|
||||
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
|
||||
use object_store::{
|
||||
ATOMIC_WRITE_DIR, Access, OLD_ATOMIC_WRITE_DIR, ObjectStore, ObjectStoreBuilder,
|
||||
};
|
||||
@@ -30,14 +30,6 @@ use snafu::prelude::*;
|
||||
use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig};
|
||||
use crate::error::{self, CreateDirSnafu, Result};
|
||||
|
||||
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
|
||||
object_store.layer(
|
||||
RetryLayer::new()
|
||||
.with_jitter()
|
||||
.with_notify(PrintDetailedError),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn new_object_store_without_cache(
|
||||
store: &ObjectStoreConfig,
|
||||
data_home: &str,
|
||||
|
||||
@@ -16,8 +16,11 @@ use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use opendal::services::{Azblob, Gcs, Oss, S3};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::util;
|
||||
|
||||
/// Object storage config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -75,10 +78,9 @@ impl ObjectStoreConfig {
|
||||
#[serde(default)]
|
||||
pub struct FileConfig {}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct S3Config {
|
||||
pub name: String,
|
||||
pub struct S3Connection {
|
||||
pub bucket: String,
|
||||
pub root: String,
|
||||
#[serde(skip_serializing)]
|
||||
@@ -91,47 +93,46 @@ pub struct S3Config {
|
||||
/// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
|
||||
/// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
|
||||
pub enable_virtual_host_style: bool,
|
||||
}
|
||||
|
||||
impl From<&S3Connection> for S3 {
|
||||
fn from(connection: &S3Connection) -> Self {
|
||||
let root = util::normalize_dir(&connection.root);
|
||||
|
||||
let mut builder = S3::default()
|
||||
.root(&root)
|
||||
.bucket(&connection.bucket)
|
||||
.access_key_id(connection.access_key_id.expose_secret())
|
||||
.secret_access_key(connection.secret_access_key.expose_secret());
|
||||
|
||||
if let Some(endpoint) = &connection.endpoint {
|
||||
builder = builder.endpoint(endpoint);
|
||||
}
|
||||
if let Some(region) = &connection.region {
|
||||
builder = builder.region(region);
|
||||
}
|
||||
if connection.enable_virtual_host_style {
|
||||
builder = builder.enable_virtual_host_style();
|
||||
}
|
||||
|
||||
builder
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct S3Config {
|
||||
pub name: String,
|
||||
#[serde(flatten)]
|
||||
pub connection: S3Connection,
|
||||
#[serde(flatten)]
|
||||
pub cache: ObjectStorageCacheConfig,
|
||||
pub http_client: HttpClientConfig,
|
||||
}
|
||||
|
||||
impl Default for S3Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: String::default(),
|
||||
bucket: String::default(),
|
||||
root: String::default(),
|
||||
access_key_id: SecretString::from(String::default()),
|
||||
secret_access_key: SecretString::from(String::default()),
|
||||
enable_virtual_host_style: false,
|
||||
endpoint: Option::default(),
|
||||
region: Option::default(),
|
||||
cache: ObjectStorageCacheConfig::default(),
|
||||
http_client: HttpClientConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for S3Config {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
&& self.bucket == other.bucket
|
||||
&& self.root == other.root
|
||||
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
|
||||
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
|
||||
&& self.endpoint == other.endpoint
|
||||
&& self.region == other.region
|
||||
&& self.enable_virtual_host_style == other.enable_virtual_host_style
|
||||
&& self.cache == other.cache
|
||||
&& self.http_client == other.http_client
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct OssConfig {
|
||||
pub name: String,
|
||||
pub struct OssConnection {
|
||||
pub bucket: String,
|
||||
pub root: String,
|
||||
#[serde(skip_serializing)]
|
||||
@@ -139,43 +140,34 @@ pub struct OssConfig {
|
||||
#[serde(skip_serializing)]
|
||||
pub access_key_secret: SecretString,
|
||||
pub endpoint: String,
|
||||
}
|
||||
|
||||
impl From<&OssConnection> for Oss {
|
||||
fn from(connection: &OssConnection) -> Self {
|
||||
let root = util::normalize_dir(&connection.root);
|
||||
Oss::default()
|
||||
.root(&root)
|
||||
.bucket(&connection.bucket)
|
||||
.endpoint(&connection.endpoint)
|
||||
.access_key_id(connection.access_key_id.expose_secret())
|
||||
.access_key_secret(connection.access_key_secret.expose_secret())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct OssConfig {
|
||||
pub name: String,
|
||||
#[serde(flatten)]
|
||||
pub connection: OssConnection,
|
||||
#[serde(flatten)]
|
||||
pub cache: ObjectStorageCacheConfig,
|
||||
pub http_client: HttpClientConfig,
|
||||
}
|
||||
|
||||
impl PartialEq for OssConfig {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
&& self.bucket == other.bucket
|
||||
&& self.root == other.root
|
||||
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
|
||||
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
|
||||
&& self.endpoint == other.endpoint
|
||||
&& self.cache == other.cache
|
||||
&& self.http_client == other.http_client
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for OssConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: String::default(),
|
||||
bucket: String::default(),
|
||||
root: String::default(),
|
||||
access_key_id: SecretString::from(String::default()),
|
||||
access_key_secret: SecretString::from(String::default()),
|
||||
endpoint: String::default(),
|
||||
cache: ObjectStorageCacheConfig::default(),
|
||||
http_client: HttpClientConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct AzblobConfig {
|
||||
pub name: String,
|
||||
pub struct AzblobConnection {
|
||||
pub container: String,
|
||||
pub root: String,
|
||||
#[serde(skip_serializing)]
|
||||
@@ -184,44 +176,40 @@ pub struct AzblobConfig {
|
||||
pub account_key: SecretString,
|
||||
pub endpoint: String,
|
||||
pub sas_token: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&AzblobConnection> for Azblob {
|
||||
fn from(connection: &AzblobConnection) -> Self {
|
||||
let root = util::normalize_dir(&connection.root);
|
||||
let mut builder = Azblob::default()
|
||||
.root(&root)
|
||||
.container(&connection.container)
|
||||
.endpoint(&connection.endpoint)
|
||||
.account_name(connection.account_name.expose_secret())
|
||||
.account_key(connection.account_key.expose_secret());
|
||||
|
||||
if let Some(token) = &connection.sas_token {
|
||||
builder = builder.sas_token(token);
|
||||
};
|
||||
|
||||
builder
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct AzblobConfig {
|
||||
pub name: String,
|
||||
#[serde(flatten)]
|
||||
pub connection: AzblobConnection,
|
||||
#[serde(flatten)]
|
||||
pub cache: ObjectStorageCacheConfig,
|
||||
pub http_client: HttpClientConfig,
|
||||
}
|
||||
|
||||
impl PartialEq for AzblobConfig {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
&& self.container == other.container
|
||||
&& self.root == other.root
|
||||
&& self.account_name.expose_secret() == other.account_name.expose_secret()
|
||||
&& self.account_key.expose_secret() == other.account_key.expose_secret()
|
||||
&& self.endpoint == other.endpoint
|
||||
&& self.sas_token == other.sas_token
|
||||
&& self.cache == other.cache
|
||||
&& self.http_client == other.http_client
|
||||
}
|
||||
}
|
||||
impl Default for AzblobConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: String::default(),
|
||||
container: String::default(),
|
||||
root: String::default(),
|
||||
account_name: SecretString::from(String::default()),
|
||||
account_key: SecretString::from(String::default()),
|
||||
endpoint: String::default(),
|
||||
sas_token: Option::default(),
|
||||
cache: ObjectStorageCacheConfig::default(),
|
||||
http_client: HttpClientConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct GcsConfig {
|
||||
pub name: String,
|
||||
pub struct GcsConnection {
|
||||
pub root: String,
|
||||
pub bucket: String,
|
||||
pub scope: String,
|
||||
@@ -230,41 +218,31 @@ pub struct GcsConfig {
|
||||
#[serde(skip_serializing)]
|
||||
pub credential: SecretString,
|
||||
pub endpoint: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct GcsConfig {
|
||||
pub name: String,
|
||||
#[serde(flatten)]
|
||||
pub connection: GcsConnection,
|
||||
#[serde(flatten)]
|
||||
pub cache: ObjectStorageCacheConfig,
|
||||
pub http_client: HttpClientConfig,
|
||||
}
|
||||
|
||||
impl Default for GcsConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: String::default(),
|
||||
root: String::default(),
|
||||
bucket: String::default(),
|
||||
scope: String::default(),
|
||||
credential_path: SecretString::from(String::default()),
|
||||
credential: SecretString::from(String::default()),
|
||||
endpoint: String::default(),
|
||||
cache: ObjectStorageCacheConfig::default(),
|
||||
http_client: HttpClientConfig::default(),
|
||||
}
|
||||
impl From<&GcsConnection> for Gcs {
|
||||
fn from(connection: &GcsConnection) -> Self {
|
||||
let root = util::normalize_dir(&connection.root);
|
||||
Gcs::default()
|
||||
.root(&root)
|
||||
.bucket(&connection.bucket)
|
||||
.scope(&connection.scope)
|
||||
.credential_path(connection.credential_path.expose_secret())
|
||||
.credential(connection.credential.expose_secret())
|
||||
.endpoint(&connection.endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for GcsConfig {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
&& self.root == other.root
|
||||
&& self.bucket == other.bucket
|
||||
&& self.scope == other.scope
|
||||
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
|
||||
&& self.credential.expose_secret() == other.credential.expose_secret()
|
||||
&& self.endpoint == other.endpoint
|
||||
&& self.cache == other.cache
|
||||
&& self.http_client == other.http_client
|
||||
}
|
||||
}
|
||||
|
||||
/// The http client options to the storage.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
use std::{fs, path};
|
||||
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use common_telemetry::info;
|
||||
use opendal::layers::HttpClientLayer;
|
||||
use opendal::services::{Fs, Gcs, Oss, S3};
|
||||
@@ -32,7 +31,7 @@ pub async fn new_raw_object_store(
|
||||
) -> Result<ObjectStore> {
|
||||
let data_home = normalize_dir(data_home);
|
||||
match store {
|
||||
ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config).await,
|
||||
ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config),
|
||||
ObjectStoreConfig::S3(s3_config) => new_s3_object_store(s3_config).await,
|
||||
ObjectStoreConfig::Oss(oss_config) => new_oss_object_store(oss_config).await,
|
||||
ObjectStoreConfig::Azblob(azblob_config) => new_azblob_object_store(azblob_config).await,
|
||||
@@ -41,10 +40,7 @@ pub async fn new_raw_object_store(
|
||||
}
|
||||
|
||||
/// A helper function to create a file system object store.
|
||||
pub async fn new_fs_object_store(
|
||||
data_home: &str,
|
||||
_file_config: &FileConfig,
|
||||
) -> Result<ObjectStore> {
|
||||
pub fn new_fs_object_store(data_home: &str, _file_config: &FileConfig) -> Result<ObjectStore> {
|
||||
fs::create_dir_all(path::Path::new(&data_home))
|
||||
.context(error::CreateDirSnafu { dir: data_home })?;
|
||||
info!("The file storage home is: {}", data_home);
|
||||
@@ -68,26 +64,14 @@ pub async fn new_fs_object_store(
|
||||
}
|
||||
|
||||
pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
|
||||
let root = util::normalize_dir(&azblob_config.root);
|
||||
|
||||
let root = util::normalize_dir(&azblob_config.connection.root);
|
||||
info!(
|
||||
"The azure storage container is: {}, root is: {}",
|
||||
azblob_config.container, &root
|
||||
azblob_config.connection.container, &root
|
||||
);
|
||||
|
||||
let client = build_http_client(&azblob_config.http_client)?;
|
||||
|
||||
let mut builder = Azblob::default()
|
||||
.root(&root)
|
||||
.container(&azblob_config.container)
|
||||
.endpoint(&azblob_config.endpoint)
|
||||
.account_name(azblob_config.account_name.expose_secret())
|
||||
.account_key(azblob_config.account_key.expose_secret());
|
||||
|
||||
if let Some(token) = &azblob_config.sas_token {
|
||||
builder = builder.sas_token(token);
|
||||
};
|
||||
|
||||
let builder = Azblob::from(&azblob_config.connection);
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
@@ -97,22 +81,14 @@ pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<Obj
|
||||
}
|
||||
|
||||
pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
|
||||
let root = util::normalize_dir(&gcs_config.root);
|
||||
let root = util::normalize_dir(&gcs_config.connection.root);
|
||||
info!(
|
||||
"The gcs storage bucket is: {}, root is: {}",
|
||||
gcs_config.bucket, &root
|
||||
gcs_config.connection.bucket, &root
|
||||
);
|
||||
|
||||
let client = build_http_client(&gcs_config.http_client)?;
|
||||
|
||||
let builder = Gcs::default()
|
||||
.root(&root)
|
||||
.bucket(&gcs_config.bucket)
|
||||
.scope(&gcs_config.scope)
|
||||
.credential_path(gcs_config.credential_path.expose_secret())
|
||||
.credential(gcs_config.credential.expose_secret())
|
||||
.endpoint(&gcs_config.endpoint);
|
||||
|
||||
let builder = Gcs::from(&gcs_config.connection);
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
@@ -122,21 +98,14 @@ pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore>
|
||||
}
|
||||
|
||||
pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
|
||||
let root = util::normalize_dir(&oss_config.root);
|
||||
let root = util::normalize_dir(&oss_config.connection.root);
|
||||
info!(
|
||||
"The oss storage bucket is: {}, root is: {}",
|
||||
oss_config.bucket, &root
|
||||
oss_config.connection.bucket, &root
|
||||
);
|
||||
|
||||
let client = build_http_client(&oss_config.http_client)?;
|
||||
|
||||
let builder = Oss::default()
|
||||
.root(&root)
|
||||
.bucket(&oss_config.bucket)
|
||||
.endpoint(&oss_config.endpoint)
|
||||
.access_key_id(oss_config.access_key_id.expose_secret())
|
||||
.access_key_secret(oss_config.access_key_secret.expose_secret());
|
||||
|
||||
let builder = Oss::from(&oss_config.connection);
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
@@ -146,31 +115,14 @@ pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore>
|
||||
}
|
||||
|
||||
pub async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
|
||||
let root = util::normalize_dir(&s3_config.root);
|
||||
|
||||
let root = util::normalize_dir(&s3_config.connection.root);
|
||||
info!(
|
||||
"The s3 storage bucket is: {}, root is: {}",
|
||||
s3_config.bucket, &root
|
||||
s3_config.connection.bucket, &root
|
||||
);
|
||||
|
||||
let client = build_http_client(&s3_config.http_client)?;
|
||||
|
||||
let mut builder = S3::default()
|
||||
.root(&root)
|
||||
.bucket(&s3_config.bucket)
|
||||
.access_key_id(s3_config.access_key_id.expose_secret())
|
||||
.secret_access_key(s3_config.secret_access_key.expose_secret());
|
||||
|
||||
if s3_config.endpoint.is_some() {
|
||||
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
|
||||
}
|
||||
if s3_config.region.is_some() {
|
||||
builder = builder.region(s3_config.region.as_ref().unwrap());
|
||||
}
|
||||
if s3_config.enable_virtual_host_style {
|
||||
builder = builder.enable_virtual_host_style();
|
||||
}
|
||||
|
||||
let builder = S3::from(&s3_config.connection);
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
|
||||
@@ -27,6 +27,8 @@ mod metrics;
|
||||
pub mod test_util;
|
||||
pub mod util;
|
||||
|
||||
pub use config::{AzblobConnection, GcsConnection, OssConnection, S3Connection};
|
||||
|
||||
/// The default object cache directory name.
|
||||
pub const OBJECT_CACHE_DIR: &str = "object_cache";
|
||||
|
||||
|
||||
@@ -17,7 +17,9 @@ use std::path;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer};
|
||||
use opendal::layers::{
|
||||
LoggingInterceptor, LoggingLayer, RetryInterceptor, RetryLayer, TracingLayer,
|
||||
};
|
||||
use opendal::raw::{AccessorInfo, HttpClient, Operation};
|
||||
use opendal::{Error, ErrorKind};
|
||||
use snafu::ResultExt;
|
||||
@@ -132,6 +134,15 @@ pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> Ob
|
||||
.layer(crate::layers::build_prometheus_metrics_layer(path_label))
|
||||
}
|
||||
|
||||
/// Adds retry layer to the object store.
|
||||
pub fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
|
||||
object_store.layer(
|
||||
RetryLayer::new()
|
||||
.with_jitter()
|
||||
.with_notify(PrintDetailedError),
|
||||
)
|
||||
}
|
||||
|
||||
static LOGGING_TARGET: &str = "opendal::services";
|
||||
|
||||
struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
|
||||
|
||||
@@ -20,7 +20,6 @@ use std::sync::Arc;
|
||||
use auth::UserProviderRef;
|
||||
use axum::Router;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use common_config::Configurable;
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
@@ -32,12 +31,12 @@ use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::config::{DatanodeOptions, StorageConfig};
|
||||
use frontend::instance::Instance;
|
||||
use frontend::service_config::{MysqlOptions, PostgresOptions};
|
||||
use object_store::ObjectStore;
|
||||
use object_store::config::{
|
||||
AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
|
||||
};
|
||||
use object_store::services::{Azblob, Gcs, Oss, S3};
|
||||
use object_store::test_util::TempFolder;
|
||||
use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
@@ -139,11 +138,14 @@ impl StorageType {
|
||||
|
||||
fn s3_test_config() -> S3Config {
|
||||
S3Config {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap().into(),
|
||||
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(),
|
||||
bucket: env::var("GT_S3_BUCKET").unwrap(),
|
||||
region: Some(env::var("GT_S3_REGION").unwrap()),
|
||||
connection: S3Connection {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap().into(),
|
||||
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(),
|
||||
bucket: env::var("GT_S3_BUCKET").unwrap(),
|
||||
region: Some(env::var("GT_S3_REGION").unwrap()),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -154,75 +156,55 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
|
||||
match store_type {
|
||||
StorageType::Gcs => {
|
||||
let gcs_config = GcsConfig {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
bucket: env::var("GT_GCS_BUCKET").unwrap(),
|
||||
scope: env::var("GT_GCS_SCOPE").unwrap(),
|
||||
credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
|
||||
credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(),
|
||||
endpoint: env::var("GT_GCS_ENDPOINT").unwrap(),
|
||||
connection: GcsConnection {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
bucket: env::var("GT_GCS_BUCKET").unwrap(),
|
||||
scope: env::var("GT_GCS_SCOPE").unwrap(),
|
||||
credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
|
||||
credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(),
|
||||
endpoint: env::var("GT_GCS_ENDPOINT").unwrap(),
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let builder = Gcs::default()
|
||||
.root(&gcs_config.root)
|
||||
.bucket(&gcs_config.bucket)
|
||||
.scope(&gcs_config.scope)
|
||||
.credential_path(gcs_config.credential_path.expose_secret())
|
||||
.credential(gcs_config.credential.expose_secret())
|
||||
.endpoint(&gcs_config.endpoint);
|
||||
|
||||
let builder = Gcs::from(&gcs_config.connection);
|
||||
let config = ObjectStoreConfig::Gcs(gcs_config);
|
||||
let store = ObjectStore::new(builder).unwrap().finish();
|
||||
(config, TempDirGuard::Gcs(TempFolder::new(&store, "/")))
|
||||
}
|
||||
StorageType::Azblob => {
|
||||
let azblob_config = AzblobConfig {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
container: env::var("GT_AZBLOB_CONTAINER").unwrap(),
|
||||
account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(),
|
||||
account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(),
|
||||
endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(),
|
||||
connection: AzblobConnection {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
container: env::var("GT_AZBLOB_CONTAINER").unwrap(),
|
||||
account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(),
|
||||
account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(),
|
||||
endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut builder = Azblob::default()
|
||||
.root(&azblob_config.root)
|
||||
.endpoint(&azblob_config.endpoint)
|
||||
.account_name(azblob_config.account_name.expose_secret())
|
||||
.account_key(azblob_config.account_key.expose_secret())
|
||||
.container(&azblob_config.container);
|
||||
|
||||
if let Ok(sas_token) = env::var("GT_AZBLOB_SAS_TOKEN") {
|
||||
builder = builder.sas_token(&sas_token);
|
||||
};
|
||||
|
||||
let builder = Azblob::from(&azblob_config.connection);
|
||||
let config = ObjectStoreConfig::Azblob(azblob_config);
|
||||
|
||||
let store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
(config, TempDirGuard::Azblob(TempFolder::new(&store, "/")))
|
||||
}
|
||||
StorageType::Oss => {
|
||||
let oss_config = OssConfig {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap().into(),
|
||||
access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap().into(),
|
||||
bucket: env::var("GT_OSS_BUCKET").unwrap(),
|
||||
endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
|
||||
connection: OssConnection {
|
||||
root: uuid::Uuid::new_v4().to_string(),
|
||||
access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap().into(),
|
||||
access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap().into(),
|
||||
bucket: env::var("GT_OSS_BUCKET").unwrap(),
|
||||
endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let builder = Oss::default()
|
||||
.root(&oss_config.root)
|
||||
.endpoint(&oss_config.endpoint)
|
||||
.access_key_id(oss_config.access_key_id.expose_secret())
|
||||
.access_key_secret(oss_config.access_key_secret.expose_secret())
|
||||
.bucket(&oss_config.bucket);
|
||||
|
||||
let builder = Oss::from(&oss_config.connection);
|
||||
let config = ObjectStoreConfig::Oss(oss_config);
|
||||
|
||||
let store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
(config, TempDirGuard::Oss(TempFolder::new(&store, "/")))
|
||||
}
|
||||
StorageType::S3 | StorageType::S3WithCache => {
|
||||
@@ -235,23 +217,9 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
|
||||
s3_config.cache.cache_path = Some("".to_string());
|
||||
}
|
||||
|
||||
let mut builder = S3::default()
|
||||
.root(&s3_config.root)
|
||||
.access_key_id(s3_config.access_key_id.expose_secret())
|
||||
.secret_access_key(s3_config.secret_access_key.expose_secret())
|
||||
.bucket(&s3_config.bucket);
|
||||
|
||||
if s3_config.endpoint.is_some() {
|
||||
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
|
||||
};
|
||||
if s3_config.region.is_some() {
|
||||
builder = builder.region(s3_config.region.as_ref().unwrap());
|
||||
};
|
||||
|
||||
let builder = S3::from(&s3_config.connection);
|
||||
let config = ObjectStoreConfig::S3(s3_config);
|
||||
|
||||
let store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
(config, TempDirGuard::S3(TempFolder::new(&store, "/")))
|
||||
}
|
||||
StorageType::File => (ObjectStoreConfig::File(FileConfig {}), TempDirGuard::None),
|
||||
|
||||
Reference in New Issue
Block a user