mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: impl ObjectStoreManager for custom_storage (#2621)
* feat: impl ObjectStoreManager for custom_storage * fix: rename object_store_manager to manager * fix: rename global to default * chore: add document for ObjectStoreManager * refactor: simplify default_object_store * fix: address review
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -5965,6 +5965,8 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
@@ -5973,6 +5975,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"moka",
|
||||
"opendal",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -7,6 +7,8 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
bytes = "1.4"
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -17,6 +19,7 @@ opendal = { version = "0.40", features = [
|
||||
"layers-tracing",
|
||||
"layers-metrics",
|
||||
] }
|
||||
snafu.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
45
src/object-store/src/error.rs
Normal file
45
src/object-store/src/error.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Default storage not found: {}", default_object_store))]
|
||||
DefaultStorageNotFound {
|
||||
location: Location,
|
||||
default_object_store: String,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::DefaultStorageNotFound { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,9 @@ pub use opendal::{
|
||||
Operator as ObjectStore, Reader, Result, Writer,
|
||||
};
|
||||
|
||||
pub mod error;
|
||||
pub mod layers;
|
||||
pub mod manager;
|
||||
mod metrics;
|
||||
pub mod test_util;
|
||||
pub mod util;
|
||||
|
||||
107
src/object-store/src/manager.rs
Normal file
107
src/object-store/src/manager.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use snafu::OptionExt;
|
||||
|
||||
use crate::error::{DefaultStorageNotFoundSnafu, Result};
|
||||
use crate::ObjectStore;
|
||||
|
||||
/// Manages multiple object stores so that users can configure a storage for each table.
|
||||
/// This struct certainly have one default object store, and can have zero or more custom object stores.
|
||||
pub struct ObjectStoreManager {
|
||||
stores: HashMap<String, ObjectStore>,
|
||||
default_object_store: ObjectStore,
|
||||
}
|
||||
|
||||
impl ObjectStoreManager {
|
||||
/// Creates a new manager with specific object stores. Returns an error if `stores` doesn't contain the default object store.
|
||||
pub fn try_new(
|
||||
stores: HashMap<String, ObjectStore>,
|
||||
default_object_store: &str,
|
||||
) -> Result<Self> {
|
||||
let default_object_store = stores
|
||||
.get(default_object_store)
|
||||
.context(DefaultStorageNotFoundSnafu {
|
||||
default_object_store,
|
||||
})?
|
||||
.clone();
|
||||
Ok(ObjectStoreManager {
|
||||
stores,
|
||||
default_object_store,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn find(&self, name: &str) -> Option<&ObjectStore> {
|
||||
self.stores.get(name)
|
||||
}
|
||||
|
||||
pub fn default_object_store(&self) -> &ObjectStore {
|
||||
&self.default_object_store
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_test_util::temp_dir::{create_temp_dir, TempDir};
|
||||
|
||||
use super::ObjectStoreManager;
|
||||
use crate::error::Error;
|
||||
use crate::services::Fs as Builder;
|
||||
use crate::ObjectStore;
|
||||
|
||||
fn new_object_store(dir: &TempDir) -> ObjectStore {
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut builder = Builder::default();
|
||||
let _ = builder.root(store_dir);
|
||||
ObjectStore::new(builder).unwrap().finish()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_returns_err_when_global_store_not_exist() {
|
||||
let dir = create_temp_dir("new");
|
||||
let object_store = new_object_store(&dir);
|
||||
let stores: HashMap<String, ObjectStore> = vec![
|
||||
("File".to_string(), object_store.clone()),
|
||||
("S3".to_string(), object_store.clone()),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
assert!(matches!(
|
||||
ObjectStoreManager::try_new(stores, "Gcs"),
|
||||
Err(Error::DefaultStorageNotFound { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_returns_ok() {
|
||||
let dir = create_temp_dir("new");
|
||||
let object_store = new_object_store(&dir);
|
||||
let stores: HashMap<String, ObjectStore> = vec![
|
||||
("File".to_string(), object_store.clone()),
|
||||
("S3".to_string(), object_store.clone()),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
let object_store_manager = ObjectStoreManager::try_new(stores, "File").unwrap();
|
||||
assert_eq!(object_store_manager.stores.len(), 2);
|
||||
assert!(object_store_manager.find("File").is_some());
|
||||
assert!(object_store_manager.find("S3").is_some());
|
||||
assert!(object_store_manager.find("Gcs").is_none());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user