From e9f7579091bee9e5b4fdf264219d2e58bc06149b Mon Sep 17 00:00:00 2001 From: Niwaka <61189782+NiwakaDev@users.noreply.github.com> Date: Mon, 6 Nov 2023 20:18:47 +0900 Subject: [PATCH] feat: support region ddl for custom_storage (#2679) * feat: support region ddl for custom_storage * fix: typo * fix: propagate error * refactor: have manifest_options accept RegionOptions * chore: improve document --- src/mito2/src/engine/create_test.rs | 32 +++++++++++ src/mito2/src/engine/drop_test.rs | 82 +++++++++++++++++++++++++++ src/mito2/src/engine/open_test.rs | 53 +++++++++++++++++ src/mito2/src/error.rs | 9 ++- src/mito2/src/region/opener.rs | 62 +++++++++++++------- src/mito2/src/region/options.rs | 22 ++++++- src/mito2/src/test_util.rs | 33 +++++++++++ src/mito2/src/worker/handle_create.rs | 3 +- src/mito2/src/worker/handle_open.rs | 26 +++++---- src/object-store/src/manager.rs | 12 ++-- 10 files changed, 293 insertions(+), 41 deletions(-) diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index bfe9af8cbb..eb1cb71690 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -166,3 +166,35 @@ async fn test_engine_create_with_options() { region.version().options.ttl.unwrap() ); } + +#[tokio::test] +async fn test_engine_create_with_custom_store() { + let mut env = TestEnv::new(); + let engine = env + .create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"]) + .await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("storage", "Gcs") + .build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + assert!(engine.is_region_exists(region_id)); + let region = engine.get_region(region_id).unwrap(); + let region_dir = region.access_layer.region_dir(); + + let object_store_manager = env.get_object_store_manager().unwrap(); + assert!(object_store_manager + .find("Gcs") + .unwrap() + .is_exist(region_dir) + .await + .unwrap()); + assert!(!object_store_manager + .default_object_store() + .is_exist(region_dir) + .await + .unwrap()); +} diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 0ffbddca65..c4a4790cb6 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -23,6 +23,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::engine::listener::DropListener; +use crate::engine::MitoEngine; use crate::test_util::{ build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, }; @@ -82,3 +83,84 @@ async fn test_engine_drop_region() { let object_store = env.get_object_store().unwrap(); assert!(!object_store.is_exist(®ion_dir).await.unwrap()); } + +#[tokio::test] +async fn test_engine_drop_region_for_custom_store() { + common_telemetry::init_default_ut_logging(); + async fn setup(engine: &MitoEngine, region_id: RegionId, storage_name: &str) { + let request = CreateRequestBuilder::new() + .insert_option("storage", storage_name) + .region_dir(storage_name) + .build(); + let column_schema = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let rows = Rows { + schema: column_schema.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(engine, region_id, rows).await; + flush_region(engine, region_id, None).await; + } + let mut env = TestEnv::with_prefix("drop"); + let listener = Arc::new(DropListener::new(Duration::from_millis(100))); + let engine = env + .create_engine_with_multiple_object_stores( + MitoConfig::default(), + None, + Some(listener.clone()), + &["Gcs"], + ) + .await; + let object_store_manager = env.get_object_store_manager().unwrap(); + + let global_region_id = RegionId::new(1, 1); + setup(&engine, global_region_id, "default").await; + let custom_region_id = RegionId::new(2, 1); + setup(&engine, custom_region_id, "Gcs").await; + + let global_region = engine.get_region(global_region_id).unwrap(); + let global_region_dir = global_region.access_layer.region_dir().to_string(); + + let custom_region = engine.get_region(custom_region_id).unwrap(); + let custom_region_dir = custom_region.access_layer.region_dir().to_string(); + + // Both these regions should exist before dropping the custom region. + assert!(object_store_manager + .find("Gcs") + .unwrap() + .is_exist(&custom_region_dir) + .await + .unwrap()); + assert!(object_store_manager + .find("default") + .unwrap() + .is_exist(&global_region_dir) + .await + .unwrap()); + + // Drop the custom region. + engine + .handle_request(custom_region_id, RegionRequest::Drop(RegionDropRequest {})) + .await + .unwrap(); + assert!(!engine.is_region_exists(custom_region_id)); + + // Wait for drop task. + listener.wait().await; + + assert!(!object_store_manager + .find("Gcs") + .unwrap() + .is_exist(&custom_region_dir) + .await + .unwrap()); + assert!(object_store_manager + .find("default") + .unwrap() + .is_exist(&global_region_dir) + .await + .unwrap()); +} diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 74cc1e0df8..39c703c5c7 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -172,3 +172,56 @@ async fn test_engine_region_open_with_options() { region.version().options.ttl.unwrap() ); } + +#[tokio::test] +async fn test_engine_region_open_with_custom_store() { + let mut env = TestEnv::new(); + let engine = env + .create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"]) + .await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("storage", "Gcs") + .build(); + let region_dir = request.region_dir.clone(); + + // Create a custom region. + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + // Close the custom region. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // Open the custom region. + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::from([("storage".to_string(), "Gcs".to_string())]), + }), + ) + .await + .unwrap(); + + // The region should not be opened with the default object store. + let region = engine.get_region(region_id).unwrap(); + let object_store_manager = env.get_object_store_manager().unwrap(); + assert!(!object_store_manager + .default_object_store() + .is_exist(region.access_layer.region_dir()) + .await + .unwrap()); + assert!(object_store_manager + .find("Gcs") + .unwrap() + .is_exist(region.access_layer.region_dir()) + .await + .unwrap()); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e6772244a0..5d76901a36 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -142,6 +142,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Object store not found: {}", object_store))] + ObjectStoreNotFound { + object_store: String, + location: Location, + }, + #[snafu(display("Region {} is corrupted, reason: {}", region_id, reason))] RegionCorrupted { region_id: RegionId, @@ -427,7 +433,8 @@ impl ErrorExt for Error { | CreateDefault { .. } | InvalidParquet { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, - InvalidScanIndex { .. } + ObjectStoreNotFound { .. } + | InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidRequest { .. } | FillDefault { .. } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index a224ee1e19..0baa8dac50 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use futures::StreamExt; +use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; -use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadata}; @@ -31,7 +31,7 @@ use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; -use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result}; +use crate::error::{EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::memtable::MemtableBuilderRef; use crate::region::options::RegionOptions; @@ -48,7 +48,7 @@ pub(crate) struct RegionOpener { region_id: RegionId, metadata: Option, memtable_builder: MemtableBuilderRef, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, region_dir: String, scheduler: SchedulerRef, options: HashMap, @@ -61,14 +61,14 @@ impl RegionOpener { region_id: RegionId, region_dir: &str, memtable_builder: MemtableBuilderRef, - object_store: ObjectStore, + object_store_manager: ObjectStoreManagerRef, scheduler: SchedulerRef, ) -> RegionOpener { RegionOpener { region_id, metadata: None, memtable_builder, - object_store, + object_store_manager, region_dir: normalize_dir(region_dir), scheduler, options: HashMap::new(), @@ -105,7 +105,6 @@ impl RegionOpener { wal: &Wal, ) -> Result { let region_id = self.region_id; - let options = self.manifest_options(config); // Tries to open the region. match self.maybe_open(config, wal).await { @@ -136,19 +135,22 @@ impl RegionOpener { ); } } + let options = RegionOptions::try_from(&self.options)?; + let object_store = self.object_store(&options.storage)?.clone(); - let metadata = Arc::new(self.metadata.unwrap()); // Create a manifest manager for this region and writes regions to the manifest file. - let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?; + let region_manifest_options = self.manifest_options(config, &options)?; + let metadata = Arc::new(self.metadata.unwrap()); + let manifest_manager = + RegionManifestManager::new(metadata.clone(), region_manifest_options).await?; let mutable = self.memtable_builder.build(&metadata); - let options = RegionOptions::try_from(&self.options)?; let version = VersionBuilder::new(metadata, mutable) .options(options) .build(); let version_control = Arc::new(VersionControl::new(version)); - let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); + let access_layer = Arc::new(AccessLayer::new(self.region_dir, object_store)); Ok(MitoRegion { region_id, @@ -203,8 +205,10 @@ impl RegionOpener { config: &MitoConfig, wal: &Wal, ) -> Result> { - let options = self.manifest_options(config); - let Some(manifest_manager) = RegionManifestManager::open(options).await? else { + let region_options = RegionOptions::try_from(&self.options)?; + let region_manifest_options = self.manifest_options(config, ®ion_options)?; + let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? + else { return Ok(None); }; @@ -212,24 +216,21 @@ impl RegionOpener { let metadata = manifest.metadata.clone(); let region_id = self.region_id; - let access_layer = Arc::new(AccessLayer::new( - self.region_dir.clone(), - self.object_store.clone(), - )); + let object_store = self.object_store(®ion_options.storage)?.clone(); + let access_layer = Arc::new(AccessLayer::new(self.region_dir.clone(), object_store)); let file_purger = Arc::new(LocalFilePurger::new( self.scheduler.clone(), access_layer.clone(), self.cache_manager.clone(), )); let mutable = self.memtable_builder.build(&metadata); - let options = RegionOptions::try_from(&self.options)?; let version = VersionBuilder::new(metadata, mutable) .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) .flushed_sequence(manifest.flushed_sequence) .truncated_entry_id(manifest.truncated_entry_id) .compaction_time_window(manifest.compaction_time_window) - .options(options) + .options(region_options) .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); @@ -249,12 +250,31 @@ impl RegionOpener { } /// Returns a new manifest options. - fn manifest_options(&self, config: &MitoConfig) -> RegionManifestOptions { - RegionManifestOptions { + fn manifest_options( + &self, + config: &MitoConfig, + options: &RegionOptions, + ) -> Result { + let object_store = self.object_store(&options.storage)?.clone(); + Ok(RegionManifestOptions { manifest_dir: new_manifest_dir(&self.region_dir), - object_store: self.object_store.clone(), + object_store, compress_type: config.manifest_compress_type, checkpoint_distance: config.manifest_checkpoint_distance, + }) + } + + /// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store. + fn object_store(&self, name: &Option) -> Result<&object_store::ObjectStore> { + if let Some(name) = name { + Ok(self + .object_store_manager + .find(name) + .context(ObjectStoreNotFoundSnafu { + object_store: name.to_string(), + })?) + } else { + Ok(self.object_store_manager.default_object_store()) } } } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index c8ef80ddf5..98f863168b 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -35,6 +35,8 @@ pub struct RegionOptions { pub ttl: Option, /// Compaction options. pub compaction: CompactionOptions, + /// Custom storage. + pub storage: Option, } impl TryFrom<&HashMap> for RegionOptions { @@ -54,6 +56,7 @@ impl TryFrom<&HashMap> for RegionOptions { Ok(RegionOptions { ttl: options.ttl, compaction, + storage: options.storage, }) } } @@ -124,12 +127,16 @@ struct RegionOptionsWithoutEnum { /// Region SST files TTL. #[serde(with = "humantime_serde")] ttl: Option, + storage: Option, } impl Default for RegionOptionsWithoutEnum { fn default() -> Self { let options = RegionOptions::default(); - RegionOptionsWithoutEnum { ttl: options.ttl } + RegionOptionsWithoutEnum { + ttl: options.ttl, + storage: options.storage, + } } } @@ -181,6 +188,17 @@ mod tests { assert_eq!(expect, options); } + #[test] + fn test_with_storage() { + let map = make_map(&[("storage", "S3")]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + storage: Some("s3".to_string()), + ..Default::default() + }; + assert_eq!(expect, options); + } + #[test] fn test_without_compaction_type() { // If `compaction.type` is not provided, we ignore all compaction @@ -222,6 +240,7 @@ mod tests { ("compaction.twcs.max_inactive_window_files", "2"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), + ("storage", "S3"), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { @@ -231,6 +250,7 @@ mod tests { max_inactive_window_files: 2, time_window: Some(Duration::from_secs(3600 * 2)), }), + storage: Some("s3".to_string()), }; assert_eq!(expect, options); } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 7d49bb2348..69bd22d26e 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -124,6 +124,10 @@ impl TestEnv { self.data_home.path() } + pub fn get_object_store_manager(&self) -> Option> { + self.object_store_manager.clone() + } + /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; @@ -151,6 +155,35 @@ impl TestEnv { MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) } + pub async fn create_engine_with_multiple_object_stores( + &mut self, + config: MitoConfig, + manager: Option, + listener: Option, + custom_storage_names: &[&str], + ) -> MitoEngine { + let (logstore, mut object_store_manager) = self.create_log_and_object_store_manager().await; + for storage_name in custom_storage_names { + let data_path = self + .data_home + .path() + .join("data") + .join(storage_name) + .as_path() + .display() + .to_string(); + let mut builder = Fs::default(); + builder.root(&data_path); + let object_store = ObjectStore::new(builder).unwrap().finish(); + object_store_manager.add(storage_name, object_store); + } + let logstore = Arc::new(logstore); + let object_store_manager = Arc::new(object_store_manager); + self.logstore = Some(logstore.clone()); + self.object_store_manager = Some(object_store_manager.clone()); + MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) + } + /// Reopen the engine. pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine { engine.stop().await.unwrap(); diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index a44c82153f..7647866494 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -55,13 +55,12 @@ impl RegionWorkerLoop { } builder.primary_key(request.primary_key); let metadata = builder.build().context(InvalidMetadataSnafu)?; - // Create a MitoRegion from the RegionMetadata. let region = RegionOpener::new( region_id, &request.region_dir, self.memtable_builder.clone(), - self.object_store_manager.default_object_store().clone(), + self.object_store_manager.clone(), self.scheduler.clone(), ) .metadata(metadata) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index e902c78968..a90a64e395 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -19,12 +19,12 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::info; use object_store::util::join_path; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; -use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{ObjectStoreNotFoundSnafu, OpenDalSnafu, RegionNotFoundSnafu, Result}; use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; use crate::worker::handle_drop::remove_region_dir_once; @@ -39,21 +39,23 @@ impl RegionWorkerLoop { if self.regions.is_region_exists(region_id) { return Ok(Output::AffectedRows(0)); } - + let object_store = if let Some(storage_name) = request.options.get("storage") { + self.object_store_manager + .find(storage_name) + .context(ObjectStoreNotFoundSnafu { + object_store: storage_name.to_string(), + })? + } else { + self.object_store_manager.default_object_store() + }; // Check if this region is pending drop. And clean the entire dir if so. if !self.dropping_regions.is_region_exists(region_id) - && self - .object_store_manager - .default_object_store() + && object_store .is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) .await .context(OpenDalSnafu)? { - let result = remove_region_dir_once( - &request.region_dir, - self.object_store_manager.default_object_store(), - ) - .await; + let result = remove_region_dir_once(&request.region_dir, object_store).await; info!("Region {} is dropped, result: {:?}", region_id, result); return RegionNotFoundSnafu { region_id }.fail(); } @@ -65,7 +67,7 @@ impl RegionWorkerLoop { region_id, &request.region_dir, self.memtable_builder.clone(), - self.object_store_manager.default_object_store().clone(), + self.object_store_manager.clone(), self.scheduler.clone(), ) .options(request.options) diff --git a/src/object-store/src/manager.rs b/src/object-store/src/manager.rs index 138e56b1d5..fb6d733219 100644 --- a/src/object-store/src/manager.rs +++ b/src/object-store/src/manager.rs @@ -21,6 +21,7 @@ pub type ObjectStoreManagerRef = Arc; /// Manages multiple object stores so that users can configure a storage for each table. /// This struct certainly have one default object store, and can have zero or more custom object stores. +#[derive(Debug)] pub struct ObjectStoreManager { stores: HashMap, default_object_store: ObjectStore, @@ -30,19 +31,19 @@ impl ObjectStoreManager { /// Creates a new manager from the object store used as a default one. pub fn new(name: &str, object_store: ObjectStore) -> Self { ObjectStoreManager { - stores: [(name.to_string(), object_store.clone())].into(), + stores: [(name.to_lowercase(), object_store.clone())].into(), default_object_store: object_store, } } /// Adds an object store to the manager. pub fn add(&mut self, name: &str, object_store: ObjectStore) { - self.stores.insert(name.to_string(), object_store); + self.stores.insert(name.to_lowercase(), object_store); } /// Finds an object store corresponding to the name. pub fn find(&self, name: &str) -> Option<&ObjectStore> { - self.stores.get(name) + self.stores.get(&name.to_lowercase()) } pub fn default_object_store(&self) -> &ObjectStore { @@ -68,10 +69,12 @@ mod tests { #[test] fn test_manager_behavior() { let dir = create_temp_dir("default"); - let mut manager = ObjectStoreManager::new("default", new_object_store(&dir)); + let mut manager = ObjectStoreManager::new("Default", new_object_store(&dir)); assert!(manager.find("default").is_some()); + assert!(manager.find("Default").is_some()); assert!(manager.find("Gcs").is_none()); + assert!(manager.find("gcs").is_none()); let dir = create_temp_dir("default"); manager.add("Gcs", new_object_store(&dir)); @@ -79,5 +82,6 @@ mod tests { // Should not overwrite the default object store with the new one. assert!(manager.find("default").is_some()); assert!(manager.find("Gcs").is_some()); + assert!(manager.find("gcs").is_some()); } }