feat: make mito2 have ObjectStoreManager(initial) (#2643)

* feat: make mito2 have object_store_manager(initial)

* chore: address review

* refactor: Arc<ObjectStoreManager> to ObjectStoreManagerRef and replace Vec with tuple

* fix: add ObjectStoreManager::from_default

* fix: remove cfg(test)

* fix: remove try_new from ObjectStoreManager
This commit is contained in:
Niwaka
2023-10-30 22:16:04 +09:00
committed by GitHub
parent d0ff8ab191
commit 000e1471eb
11 changed files with 96 additions and 141 deletions

View File

@@ -33,6 +33,7 @@ use futures_util::StreamExt;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::Mode;
@@ -354,7 +355,12 @@ impl DatanodeBuilder {
let mut region_server =
RegionServer::new(query_engine.clone(), runtime.clone(), event_listener);
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
let object_store_manager = ObjectStoreManager::new(
"default", // TODO: use a name which is set in the configuration when #919 is done.
object_store,
);
let engines =
Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?;
for engine in engines {
region_server.register_engine(engine);
}
@@ -392,7 +398,7 @@ impl DatanodeBuilder {
async fn build_store_engines<S>(
opts: &DatanodeOptions,
log_store: Arc<S>,
object_store: object_store::ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> Result<Vec<RegionEngineRef>>
where
S: LogStore,
@@ -401,12 +407,18 @@ impl DatanodeBuilder {
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
let engine: MitoEngine =
MitoEngine::new(config.clone(), log_store.clone(), object_store.clone());
let engine: MitoEngine = MitoEngine::new(
config.clone(),
log_store.clone(),
object_store_manager.clone(),
);
engines.push(Arc::new(engine) as _);
}
RegionEngineConfig::File(config) => {
let engine = FileRegionEngine::new(config.clone(), object_store.clone());
let engine = FileRegionEngine::new(
config.clone(),
object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
);
engines.push(Arc::new(engine) as _);
}
}

View File

@@ -463,7 +463,6 @@ impl ErrorExt for Error {
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
ParseSql { source, .. } => source.status_code(),
DeleteExprToRequest { source, .. } | InsertData { source, .. } => source.status_code(),
ColumnValuesNumberMismatch { .. }

View File

@@ -46,7 +46,7 @@ use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::timer;
use object_store::ObjectStore;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
@@ -72,12 +72,12 @@ impl MitoEngine {
pub fn new<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> MitoEngine {
config.sanitize();
MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store)),
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager)),
}
}
@@ -108,10 +108,10 @@ impl EngineInner {
fn new<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store),
workers: WorkerGroup::start(config, log_store, object_store_manager),
}
}
@@ -235,7 +235,7 @@ impl MitoEngine {
pub fn new_for_test<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> MitoEngine {
@@ -246,7 +246,7 @@ impl MitoEngine {
workers: WorkerGroup::start_for_test(
config,
log_store,
object_store,
object_store_manager,
write_buffer_manager,
listener,
),

View File

@@ -35,6 +35,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
@@ -71,7 +72,7 @@ pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
logstore: Option<Arc<RaftEngineLogStore>>,
object_store: Option<ObjectStore>,
object_store_manager: Option<ObjectStoreManagerRef>,
}
impl Default for TestEnv {
@@ -86,7 +87,7 @@ impl TestEnv {
TestEnv {
data_home: create_temp_dir(""),
logstore: None,
object_store: None,
object_store_manager: None,
}
}
@@ -95,7 +96,7 @@ impl TestEnv {
TestEnv {
data_home: create_temp_dir(prefix),
logstore: None,
object_store: None,
object_store_manager: None,
}
}
@@ -104,7 +105,7 @@ impl TestEnv {
TestEnv {
data_home,
logstore: None,
object_store: None,
object_store_manager: None,
}
}
@@ -113,17 +114,20 @@ impl TestEnv {
}
pub fn get_object_store(&self) -> Option<ObjectStore> {
self.object_store.clone()
self.object_store_manager
.as_ref()
.map(|manager| manager.default_object_store().clone())
}
/// Creates a new engine with specific config under this env.
pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store = Some(object_store.clone());
MitoEngine::new(config, logstore, object_store)
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new(config, logstore, object_store_manager)
}
/// Creates a new engine with specific config and manager/listener under this env.
@@ -133,12 +137,13 @@ impl TestEnv {
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
) -> MitoEngine {
let (log_store, object_store) = self.create_log_and_object_store().await;
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store = Some(object_store.clone());
MitoEngine::new_for_test(config, logstore, object_store, manager, listener)
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
}
/// Reopen the engine.
@@ -148,18 +153,20 @@ impl TestEnv {
MitoEngine::new(
config,
self.logstore.clone().unwrap(),
self.object_store.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
)
}
/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
let (log_store, object_store) = self.create_log_and_object_store().await;
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
WorkerGroup::start(config, Arc::new(log_store), object_store)
WorkerGroup::start(config, Arc::new(log_store), Arc::new(object_store_manager))
}
async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) {
async fn create_log_and_object_store_manager(
&self,
) -> (RaftEngineLogStore, ObjectStoreManager) {
let data_home = self.data_home.path();
let wal_path = data_home.join("wal");
let data_path = data_home.join("data").as_path().display().to_string();
@@ -168,8 +175,8 @@ impl TestEnv {
let mut builder = Fs::default();
builder.root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
(log_store, object_store)
let object_store_manager = ObjectStoreManager::new("default", object_store);
(log_store, object_store_manager)
}
/// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata`

View File

@@ -33,7 +33,7 @@ use std::time::Duration;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::ObjectStore;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
@@ -112,7 +112,7 @@ impl WorkerGroup {
pub(crate) fn start<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
) -> WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
@@ -131,7 +131,7 @@ impl WorkerGroup {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store: object_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::default(),
@@ -206,7 +206,7 @@ impl WorkerGroup {
pub(crate) fn start_for_test<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
@@ -229,7 +229,7 @@ impl WorkerGroup {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store: object_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
@@ -256,7 +256,7 @@ struct WorkerStarter<S> {
id: WorkerId,
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
listener: WorkerListener,
@@ -278,7 +278,7 @@ impl<S: LogStore> WorkerStarter<S> {
sender: sender.clone(),
receiver,
wal: Wal::new(self.log_store),
object_store: self.object_store,
object_store_manager: self.object_store_manager.clone(),
running: running.clone(),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
@@ -426,8 +426,8 @@ struct RegionWorkerLoop<S> {
receiver: Receiver<WorkerRequest>,
/// WAL of the engine.
wal: Wal<S>,
/// Object store for manifest and SSTs.
object_store: ObjectStore,
/// Manages object stores for manifest and SSTs.
object_store_manager: ObjectStoreManagerRef,
/// Whether the worker thread is still running.
running: Arc<AtomicBool>,
/// Memtable builder for each region.

View File

@@ -62,7 +62,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
&request.region_dir,
self.memtable_builder.clone(),
self.object_store.clone(),
self.object_store_manager.default_object_store().clone(),
self.scheduler.clone(),
)
.metadata(metadata)

View File

@@ -43,7 +43,9 @@ impl<S> RegionWorkerLoop<S> {
// write dropping marker
let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE);
self.object_store
region
.access_layer
.object_store()
.write(&marker_path, vec![])
.await
.context(OpenDalSnafu)?;
@@ -68,7 +70,7 @@ impl<S> RegionWorkerLoop<S> {
// detach a background task to delete the region dir
let region_dir = region.access_layer.region_dir().to_owned();
let object_store = self.object_store.clone();
let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone();
let listener = self.listener.clone();
common_runtime::spawn_bg(async move {

View File

@@ -44,12 +44,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Check if this region is pending drop. And clean the entire dir if so.
if !self.dropping_regions.is_region_exists(region_id)
&& self
.object_store
.object_store_manager
.default_object_store()
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.await
.context(OpenDalSnafu)?
{
let result = remove_region_dir_once(&request.region_dir, &self.object_store).await;
let result = remove_region_dir_once(
&request.region_dir,
self.object_store_manager.default_object_store(),
)
.await;
info!("Region {} is dropped, result: {:?}", region_id, result);
return RegionNotFoundSnafu { region_id }.fail();
}
@@ -61,7 +66,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id,
&request.region_dir,
self.memtable_builder.clone(),
self.object_store.clone(),
self.object_store_manager.default_object_store().clone(),
self.scheduler.clone(),
)
.options(request.options)

View File

@@ -1,45 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Default storage not found: {}", default_object_store))]
DefaultStorageNotFound {
location: Location,
default_object_store: String,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::DefaultStorageNotFound { .. } => StatusCode::InvalidArguments,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -19,7 +19,6 @@ pub use opendal::{
Operator as ObjectStore, Reader, Result, Writer,
};
pub mod error;
pub mod layers;
pub mod manager;
mod metrics;

View File

@@ -13,12 +13,12 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use snafu::OptionExt;
use crate::error::{DefaultStorageNotFoundSnafu, Result};
use crate::ObjectStore;
pub type ObjectStoreManagerRef = Arc<ObjectStoreManager>;
/// Manages multiple object stores so that users can configure a storage for each table.
/// This struct certainly have one default object store, and can have zero or more custom object stores.
pub struct ObjectStoreManager {
@@ -27,23 +27,20 @@ pub struct ObjectStoreManager {
}
impl ObjectStoreManager {
/// Creates a new manager with specific object stores. Returns an error if `stores` doesn't contain the default object store.
pub fn try_new(
stores: HashMap<String, ObjectStore>,
default_object_store: &str,
) -> Result<Self> {
let default_object_store = stores
.get(default_object_store)
.context(DefaultStorageNotFoundSnafu {
default_object_store,
})?
.clone();
Ok(ObjectStoreManager {
stores,
default_object_store,
})
/// Creates a new manager from the object store used as a default one.
pub fn new(name: &str, object_store: ObjectStore) -> Self {
ObjectStoreManager {
stores: [(name.to_string(), object_store.clone())].into(),
default_object_store: object_store,
}
}
/// Adds an object store to the manager.
pub fn add(&mut self, name: &str, object_store: ObjectStore) {
self.stores.insert(name.to_string(), object_store);
}
/// Finds an object store corresponding to the name.
pub fn find(&self, name: &str) -> Option<&ObjectStore> {
self.stores.get(name)
}
@@ -55,12 +52,9 @@ impl ObjectStoreManager {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use super::ObjectStoreManager;
use crate::error::Error;
use crate::services::Fs as Builder;
use crate::ObjectStore;
@@ -72,36 +66,18 @@ mod tests {
}
#[test]
fn test_new_returns_err_when_global_store_not_exist() {
let dir = create_temp_dir("new");
let object_store = new_object_store(&dir);
let stores: HashMap<String, ObjectStore> = vec![
("File".to_string(), object_store.clone()),
("S3".to_string(), object_store.clone()),
]
.into_iter()
.collect();
fn test_manager_behavior() {
let dir = create_temp_dir("default");
let mut manager = ObjectStoreManager::new("default", new_object_store(&dir));
assert!(matches!(
ObjectStoreManager::try_new(stores, "Gcs"),
Err(Error::DefaultStorageNotFound { .. })
));
}
assert!(manager.find("default").is_some());
assert!(manager.find("Gcs").is_none());
#[test]
fn test_new_returns_ok() {
let dir = create_temp_dir("new");
let object_store = new_object_store(&dir);
let stores: HashMap<String, ObjectStore> = vec![
("File".to_string(), object_store.clone()),
("S3".to_string(), object_store.clone()),
]
.into_iter()
.collect();
let object_store_manager = ObjectStoreManager::try_new(stores, "File").unwrap();
assert_eq!(object_store_manager.stores.len(), 2);
assert!(object_store_manager.find("File").is_some());
assert!(object_store_manager.find("S3").is_some());
assert!(object_store_manager.find("Gcs").is_none());
let dir = create_temp_dir("default");
manager.add("Gcs", new_object_store(&dir));
// Should not overwrite the default object store with the new one.
assert!(manager.find("default").is_some());
assert!(manager.find("Gcs").is_some());
}
}