From c529c8a41ba2a8dbfc7e84560ddc23ee930c02ce Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 1 Aug 2023 19:49:07 +0900 Subject: [PATCH] feat(mito): Implement open and close for mito2 regions (#2052) * feat: add close request * feat: handle close and open request * feat: Implement open * test: add TestEnv::new * feat: close region/engine and test * style: fix clippy * style: import log macros * docs: update docs * docs: add mermaid for manifest manager --- src/mito2/src/config.rs | 9 +- src/mito2/src/engine.rs | 32 ++++++-- src/mito2/src/engine/tests.rs | 114 ++++++++++++++++++++++++-- src/mito2/src/error.rs | 22 ++++- src/mito2/src/lib.rs | 4 + src/mito2/src/manifest/action.rs | 2 +- src/mito2/src/manifest/manager.rs | 65 ++++++++++++++- src/mito2/src/manifest/storage.rs | 30 +++---- src/mito2/src/region.rs | 36 ++++++++ src/mito2/src/region/opener.rs | 66 +++++++++++++-- src/mito2/src/sst/parquet/writer.rs | 4 +- src/mito2/src/test_util.rs | 15 +++- src/mito2/src/worker.rs | 34 ++++++-- src/mito2/src/worker/handle_close.rs | 38 +++++++++ src/mito2/src/worker/handle_create.rs | 9 +- src/mito2/src/worker/handle_open.rs | 30 ++++++- src/mito2/src/worker/request.rs | 16 ++++ 17 files changed, 465 insertions(+), 61 deletions(-) create mode 100644 src/mito2/src/worker/handle_close.rs diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 8b078a9c69..5462c9bac6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -16,7 +16,7 @@ use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; -use common_telemetry::logging; +use common_telemetry::warn; /// Default region worker num. const DEFAULT_NUM_WORKERS: usize = 1; @@ -64,16 +64,15 @@ impl MitoConfig { } self.num_workers = self.num_workers.next_power_of_two(); if num_workers_before != self.num_workers { - logging::warn!( + warn!( "Sanitize worker num {} to {}", - num_workers_before, - self.num_workers + num_workers_before, self.num_workers ); } // Sanitize channel size. if self.worker_channel_size == 0 { - logging::warn!("Sanitize channel size 0 to 1"); + warn!("Sanitize channel size 0 to 1"); self.worker_channel_size = 1; } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d18d02e7bc..312e25dc41 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -27,7 +27,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; pub use crate::worker::request::CreateRequest; -use crate::worker::request::{RegionRequest, RequestBody}; +use crate::worker::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody}; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -56,8 +56,28 @@ impl MitoEngine { } /// Creates a new region. - pub async fn create_region(&self, request: CreateRequest) -> Result<()> { - self.inner.create_region(request).await + pub async fn create_region(&self, create_request: CreateRequest) -> Result<()> { + self.inner + .handle_request_body(RequestBody::Create(create_request)) + .await + } + + /// Opens an existing region. + /// + /// Returns error if the region does not exist. + pub async fn open_region(&self, open_request: OpenRequest) -> Result<()> { + self.inner + .handle_request_body(RequestBody::Open(open_request)) + .await + } + + /// Closes a region. + /// + /// Does nothing if the region is already closed. + pub async fn close_region(&self, close_request: CloseRequest) -> Result<()> { + self.inner + .handle_request_body(RequestBody::Close(close_request)) + .await } /// Returns true if the specific region exists. @@ -89,9 +109,9 @@ impl EngineInner { self.workers.stop().await } - /// Creates a new region. - async fn create_region(&self, create_request: CreateRequest) -> Result<()> { - let (request, receiver) = RegionRequest::from_body(RequestBody::Create(create_request)); + /// Handles [RequestBody] and return its executed result. + async fn handle_request_body(&self, body: RequestBody) -> Result<()> { + let (request, receiver) = RegionRequest::from_body(body); self.workers.submit_to_worker(request).await?; receiver.await.context(RecvSnafu)? diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 5845f749aa..87dd27e190 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -19,15 +19,22 @@ use store_api::storage::RegionId; use super::*; use crate::error::Error; use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::worker::request::RegionOptions; #[tokio::test] async fn test_engine_new_stop() { - let env = TestEnv::new("engine-stop"); + let env = TestEnv::with_prefix("engine-stop"); let engine = env.create_engine(MitoConfig::default()).await; - engine.stop().await.unwrap(); + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + engine.create_region(request).await.unwrap(); - let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build(); + // Stop the engine to reject further requests. + engine.stop().await.unwrap(); + assert!(!engine.is_region_exists(region_id)); + + let request = CreateRequestBuilder::new(RegionId::new(1, 2)).build(); let err = engine.create_region(request).await.unwrap_err(); assert!( matches!(err, Error::WorkerStopped { .. }), @@ -37,7 +44,7 @@ async fn test_engine_new_stop() { #[tokio::test] async fn test_engine_create_new_region() { - let env = TestEnv::new("new-region"); + let env = TestEnv::with_prefix("new-region"); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -49,7 +56,7 @@ async fn test_engine_create_new_region() { #[tokio::test] async fn test_engine_create_region_if_not_exists() { - let env = TestEnv::new("create-not-exists"); + let env = TestEnv::with_prefix("create-not-exists"); let engine = env.create_engine(MitoConfig::default()).await; let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true); @@ -61,7 +68,7 @@ async fn test_engine_create_region_if_not_exists() { #[tokio::test] async fn test_engine_create_existing_region() { - let env = TestEnv::new("create-existing"); + let env = TestEnv::with_prefix("create-existing"); let engine = env.create_engine(MitoConfig::default()).await; let builder = CreateRequestBuilder::new(RegionId::new(1, 1)); @@ -74,3 +81,98 @@ async fn test_engine_create_existing_region() { "unexpected err: {err}" ); } + +#[tokio::test] +async fn test_engine_open_empty() { + let env = TestEnv::with_prefix("open-empty"); + let engine = env.create_engine(MitoConfig::default()).await; + + let err = engine + .open_region(OpenRequest { + region_id: RegionId::new(1, 1), + region_dir: "empty".to_string(), + options: RegionOptions::default(), + }) + .await + .unwrap_err(); + assert!( + matches!(err, Error::RegionNotFound { .. }), + "unexpected err: {err}" + ); +} + +#[tokio::test] +async fn test_engine_open_existing() { + let env = TestEnv::with_prefix("open-exiting"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + let region_dir = request.region_dir.clone(); + engine.create_region(request).await.unwrap(); + + engine + .open_region(OpenRequest { + region_id, + region_dir, + options: RegionOptions::default(), + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_engine_close_region() { + let env = TestEnv::with_prefix("close"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + // It's okay to close a region doesn't exist. + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); + + let request = CreateRequestBuilder::new(region_id).build(); + engine.create_region(request).await.unwrap(); + + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); + assert!(!engine.is_region_exists(region_id)); + + // It's okay to close this region again. + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_engine_reopen_region() { + let env = TestEnv::with_prefix("reopen-region"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + let region_dir = request.region_dir.clone(); + engine.create_region(request).await.unwrap(); + + // Close the region. + engine + .close_region(CloseRequest { region_id }) + .await + .unwrap(); + + // Open the region again. + engine + .open_region(OpenRequest { + region_id, + region_dir, + options: RegionOptions::default(), + }) + .await + .unwrap(); + assert!(engine.is_region_exists(region_id)); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b44c5c3020..5087969456 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -156,6 +156,24 @@ pub enum Error { source: parquet::errors::ParquetError, location: Location, }, + + #[snafu(display("Region {} not found, location: {}", region_id, location))] + RegionNotFound { + region_id: RegionId, + location: Location, + }, + + #[snafu(display( + "Region {} is corrupted, reason: {}, location: {}", + region_id, + reason, + location + ))] + RegionCorrupted { + region_id: RegionId, + reason: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -173,7 +191,9 @@ impl ErrorExt for Error { | SerdeJson { .. } | Utf8 { .. } | RegionExists { .. } - | NewRecordBatch { .. } => StatusCode::Unexpected, + | NewRecordBatch { .. } + | RegionNotFound { .. } + | RegionCorrupted { .. } => StatusCode::Unexpected, InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => { StatusCode::InvalidArguments } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 0d378a323c..05aa1b49a5 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -162,4 +162,8 @@ mod worker; /// /// The engine handles DMLs and DDLs in dedicated [workers](crate::worker::WorkerGroup). /// +/// ## Region manifest +/// +/// The [RegionManifestManager](crate::manifest::manager::RegionManifestManager) manages metadata of the engine. +/// mod docs {} diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index a996ec0c2d..c7ac514361 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -105,7 +105,7 @@ impl RegionManifestBuilder { } } - /// Check if the builder keeps a [RegionMetadata] + /// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata). pub fn contains_metadata(&self) -> bool { self.metadata.is_some() } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 40d90ed0f4..8247298d6d 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -33,8 +33,67 @@ use crate::metadata::RegionMetadataRef; // trait MetaAction -> struct RegionMetaActionList // trait MetaActionIterator -> struct MetaActionIteratorImpl +#[cfg_attr(doc, aquamarine::aquamarine)] /// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted /// metadata. +/// +/// ```mermaid +/// classDiagram +/// class RegionManifestManager { +/// -ManifestObjectStore store +/// -RegionManifestOptions options +/// -RegionManifest manifest +/// +new() RegionManifestManager +/// +open() Option~RegionManifestManager~ +/// +stop() +/// +update(RegionMetaActionList action_list) ManifestVersion +/// +manifest() RegionManifest +/// } +/// class ManifestObjectStore { +/// -ObjectStore object_store +/// } +/// class RegionChange { +/// -RegionMetadataRef metadata +/// } +/// class RegionEdit { +/// -VersionNumber regoin_version +/// -Vec~FileMeta~ files_to_add +/// -Vec~FileMeta~ files_to_remove +/// -SequenceNumber flushed_sequence +/// } +/// class RegionRemove { +/// -RegionId region_id +/// } +/// RegionManifestManager o-- ManifestObjectStore +/// RegionManifestManager o-- RegionManifest +/// RegionManifestManager o-- RegionManifestOptions +/// RegionManifestManager -- RegionMetaActionList +/// RegionManifestManager -- RegionCheckpoint +/// ManifestObjectStore o-- ObjectStore +/// RegionMetaActionList o-- RegionMetaAction +/// RegionMetaAction o-- ProtocolAction +/// RegionMetaAction o-- RegionChange +/// RegionMetaAction o-- RegionEdit +/// RegionMetaAction o-- RegionRemove +/// RegionChange o-- RegionMetadata +/// RegionEdit o-- FileMeta +/// +/// class RegionManifest { +/// -RegionMetadataRef metadata +/// -HashMap<FileId, FileMeta> files +/// -ManifestVersion manifest_version +/// } +/// class RegionMetadata +/// class FileMeta +/// RegionManifest o-- RegionMetadata +/// RegionManifest o-- FileMeta +/// +/// class RegionCheckpoint { +/// -ManifestVersion last_version +/// -Option~RegionManifest~ checkpoint +/// } +/// RegionCheckpoint o-- RegionManifest +/// ``` #[derive(Debug)] pub struct RegionManifestManager { inner: RwLock, @@ -377,7 +436,7 @@ mod test { #[tokio::test] async fn create_manifest_manager() { let metadata = Arc::new(basic_region_metadata()); - let env = TestEnv::new(""); + let env = TestEnv::new(); let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await @@ -389,7 +448,7 @@ mod test { #[tokio::test] async fn open_manifest_manager() { - let env = TestEnv::new(""); + let env = TestEnv::new(); // Try to opens an empty manifest. assert!(env .create_manifest_manager(CompressionType::Uncompressed, 10, None) @@ -420,7 +479,7 @@ mod test { #[tokio::test] async fn region_change_add_column() { let metadata = Arc::new(basic_region_metadata()); - let env = TestEnv::new(""); + let env = TestEnv::new(); let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 7d9f56abf8..db270056d1 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -17,7 +17,7 @@ use std::iter::Iterator; use std::str::FromStr; use common_datasource::compression::CompressionType; -use common_telemetry::logging; +use common_telemetry::{debug, info}; use futures::TryStreamExt; use lazy_static::lazy_static; use object_store::{raw_normalize_path, util, Entry, ErrorKind, ObjectStore}; @@ -267,7 +267,7 @@ impl ManifestObjectStore { .collect(); let ret = paths.len(); - logging::debug!( + debug!( "Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}", ret, self.path, @@ -300,7 +300,7 @@ impl ManifestObjectStore { .map(|e| e.path().to_string()) .collect(); - logging::info!( + info!( "Deleting {} from manifest storage path {} paths: {:?}", paths.len(), self.path, @@ -318,14 +318,14 @@ impl ManifestObjectStore { .remove_all(&self.path) .await .context(OpenDalSnafu)?; - logging::info!("Deleted manifest storage path {}", self.path); + info!("Deleted manifest storage path {}", self.path); Ok(()) } pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); - logging::debug!("Save log to manifest storage, version: {}", version); + debug!("Save log to manifest storage, version: {}", version); let data = self .compress_type .encode(bytes) @@ -357,10 +357,9 @@ impl ManifestObjectStore { } } - logging::debug!( + debug!( "Deleting logs from manifest storage, start: {}, end: {}", - start, - end + start, end ); self.object_store @@ -396,10 +395,9 @@ impl ManifestObjectStore { extend_metadata: HashMap::new(), }; - logging::debug!( + debug!( "Save checkpoint in path: {}, metadata: {:?}", - last_checkpoint_path, - checkpoint_metadata + last_checkpoint_path, checkpoint_metadata ); let bytes = checkpoint_metadata.encode()?; @@ -437,10 +435,9 @@ impl ManifestObjectStore { &checkpoint_file(version), FALL_BACK_COMPRESS_TYPE, ); - logging::debug!( + debug!( "Failed to load checkpoint from path: {}, fall back to path: {}", - path, - fall_back_path + path, fall_back_path ); match self.object_store.read(&fall_back_path).await { Ok(checkpoint) => { @@ -506,10 +503,9 @@ impl ManifestObjectStore { let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; - logging::debug!( + debug!( "Load checkpoint in path: {}, metadata: {:?}", - last_checkpoint_path, - checkpoint_metadata + last_checkpoint_path, checkpoint_metadata ); self.load_checkpoint(checkpoint_metadata.version).await diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 22a235e787..c0266ff0df 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -20,8 +20,10 @@ mod version; use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use common_telemetry::info; use store_api::storage::RegionId; +use crate::error::Result; use crate::manifest::manager::RegionManifestManager; use crate::region::version::VersionControlRef; @@ -45,6 +47,17 @@ pub(crate) struct MitoRegion { pub(crate) type MitoRegionRef = Arc; +impl MitoRegion { + /// Stop background tasks for this region. + pub(crate) async fn stop(&self) -> Result<()> { + self.manifest_manager.stop().await?; + + info!("Stopped region, region_id: {}", self.region_id); + + Ok(()) + } +} + /// Regions indexed by ids. #[derive(Debug, Default)] pub(crate) struct RegionMap { @@ -63,6 +76,29 @@ impl RegionMap { let mut regions = self.regions.write().unwrap(); regions.insert(region.region_id, region); } + + /// Get region by region id. + pub(crate) fn get_region(&self, region_id: RegionId) -> Option { + let regions = self.regions.read().unwrap(); + regions.get(®ion_id).cloned() + } + + /// Remove region by id. + pub(crate) fn remove_region(&self, region_id: RegionId) { + let mut regions = self.regions.write().unwrap(); + regions.remove(®ion_id); + } + + /// List all regions. + pub(crate) fn list_regions(&self) -> Vec { + let regions = self.regions.read().unwrap(); + regions.values().cloned().collect() + } + + /// Clear the map. + pub(crate) fn clear(&self) { + self.regions.write().unwrap().clear(); + } } pub(crate) type RegionMapRef = Arc; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 2b2f8e1f4f..a4e477952e 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use object_store::util::join_dir; use object_store::ObjectStore; +use snafu::{ensure, OptionExt}; +use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::Result; +use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result}; use crate::manifest::manager::RegionManifestManager; use crate::manifest::options::RegionManifestOptions; use crate::memtable::MemtableBuilderRef; @@ -30,7 +32,8 @@ use crate::region::MitoRegion; /// Builder to create a new [MitoRegion] or open an existing one. pub(crate) struct RegionOpener { - metadata: RegionMetadata, + region_id: RegionId, + metadata: Option, memtable_builder: MemtableBuilderRef, object_store: ObjectStore, region_dir: String, @@ -39,18 +42,25 @@ pub(crate) struct RegionOpener { impl RegionOpener { /// Returns a new opener. pub(crate) fn new( - metadata: RegionMetadata, + region_id: RegionId, memtable_builder: MemtableBuilderRef, object_store: ObjectStore, ) -> RegionOpener { RegionOpener { - metadata, + region_id, + metadata: None, memtable_builder, object_store, region_dir: String::new(), } } + /// Sets metadata of the region to create. + pub(crate) fn metadata(mut self, metadata: RegionMetadata) -> Self { + self.metadata = Some(metadata); + self + } + /// Sets the region dir. pub(crate) fn region_dir(mut self, value: &str) -> Self { self.region_dir = value.to_string(); @@ -58,9 +68,12 @@ impl RegionOpener { } /// Writes region manifest and creates a new region. + /// + /// # Panics + /// Panics if metadata is not set. pub(crate) async fn create(self, config: &MitoConfig) -> Result { - let region_id = self.metadata.region_id; - let metadata = Arc::new(self.metadata); + let region_id = self.region_id; + let metadata = Arc::new(self.metadata.unwrap()); // Create a manifest manager for this region. let options = RegionManifestOptions { @@ -83,6 +96,47 @@ impl RegionOpener { manifest_manager, }) } + + /// Opens an existing region. + /// + /// Returns error if the region doesn't exist. + pub(crate) async fn open(self, config: &MitoConfig) -> Result { + let options = RegionManifestOptions { + manifest_dir: new_manifest_dir(&self.region_dir), + object_store: self.object_store, + compress_type: config.manifest_compress_type, + checkpoint_interval: config.manifest_checkpoint_interval, + }; + let manifest_manager = + RegionManifestManager::open(options) + .await? + .context(RegionNotFoundSnafu { + region_id: self.region_id, + })?; + + let manifest = manifest_manager.manifest().await; + let metadata = manifest.metadata.clone(); + + ensure!( + metadata.region_id == self.region_id, + RegionCorruptedSnafu { + region_id: self.region_id, + reason: format!("region id in metadata is {}", metadata.region_id), + } + ); + + let mutable = self.memtable_builder.build(&metadata); + let version = VersionBuilder::new(metadata, mutable).build(); + let version_control = Arc::new(VersionControl::new(version)); + + // TODO(yingwen): Replay. + + Ok(MitoRegion { + region_id: self.region_id, + version_control, + manifest_manager, + }) + } } /// Returns the directory to the manifest files. diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 577f998293..4b50a57534 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,7 +14,7 @@ //! Parquet writer. -use common_telemetry::logging; +use common_telemetry::debug; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; @@ -80,7 +80,7 @@ impl<'a> ParquetWriter<'a> { let stats = self.source.stats(); if stats.num_rows == 0 { - logging::debug!( + debug!( "No data written, try to stop the writer: {}", self.file_path ); diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 71244f6a19..4f72059cc6 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -42,9 +42,22 @@ pub struct TestEnv { data_home: TempDir, } +impl Default for TestEnv { + fn default() -> Self { + TestEnv::new() + } +} + impl TestEnv { + /// Returns a new env with empty prefix for test. + pub fn new() -> TestEnv { + TestEnv { + data_home: create_temp_dir(""), + } + } + /// Returns a new env with specific `prefix` for test. - pub fn new(prefix: &str) -> TestEnv { + pub fn with_prefix(prefix: &str) -> TestEnv { TestEnv { data_home: create_temp_dir(prefix), } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index c096575623..c6ecfb1243 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -14,6 +14,7 @@ //! Structs and utilities for writing regions. +mod handle_close; mod handle_create; mod handle_open; pub(crate) mod request; @@ -24,7 +25,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use common_runtime::JoinHandle; -use common_telemetry::logging; +use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::ObjectStore; use snafu::{ensure, ResultExt}; @@ -42,6 +43,7 @@ use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest}; /// Identifier for a worker. pub(crate) type WorkerId = u32; +#[cfg_attr(doc, aquamarine::aquamarine)] /// A fixed size group of [RegionWorkers](RegionWorker). /// /// A worker group binds each region to a specific [RegionWorker] and sends @@ -111,7 +113,7 @@ impl WorkerGroup { /// Stop the worker group. pub(crate) async fn stop(&self) -> Result<()> { - logging::info!("Stop region worker group"); + info!("Stop region worker group"); try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; @@ -204,7 +206,7 @@ impl RegionWorker { .await .is_err() { - logging::warn!( + warn!( "Worker {} is already exited but the running flag is still true", self.id ); @@ -222,11 +224,11 @@ impl RegionWorker { async fn stop(&self) -> Result<()> { let handle = self.handle.lock().await.take(); if let Some(handle) = handle { - logging::info!("Stop region worker {}", self.id); + info!("Stop region worker {}", self.id); self.set_running(false); if self.sender.send(WorkerRequest::Stop).await.is_err() { - logging::warn!("Worker {} is already exited before stop", self.id); + warn!("Worker {} is already exited before stop", self.id); } handle.await.context(JoinSnafu)?; @@ -285,7 +287,7 @@ struct RegionWorkerLoop { impl RegionWorkerLoop { /// Starts the worker loop. async fn run(&mut self) { - logging::info!("Start region worker thread {}", self.id); + info!("Start region worker thread {}", self.id); // Buffer to retrieve requests from receiver. let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size); @@ -312,7 +314,9 @@ impl RegionWorkerLoop { self.handle_requests(&mut buffer).await; } - logging::info!("Exit region worker thread {}", self.id); + self.clean().await; + + info!("Exit region worker thread {}", self.id); } /// Dispatches and processes requests. @@ -367,6 +371,7 @@ impl RegionWorkerLoop { let res = match request.body { RequestBody::Create(req) => self.handle_create_request(req).await, RequestBody::Open(req) => self.handle_open_request(req).await, + RequestBody::Close(req) => self.handle_close_request(req).await, RequestBody::Write(_) => unreachable!(), }; @@ -376,6 +381,19 @@ impl RegionWorkerLoop { } } } + + // Clean up the worker. + async fn clean(&self) { + // Closes remaining regions. + let regions = self.regions.list_regions(); + for region in regions { + if let Err(e) = region.stop().await { + error!(e; "Failed to stop region {}", region.region_id); + } + } + + self.regions.clear(); + } } #[cfg(test)] @@ -398,7 +416,7 @@ mod tests { #[tokio::test] async fn test_worker_group_start_stop() { - let env = TestEnv::new("group-stop"); + let env = TestEnv::with_prefix("group-stop"); let group = env .create_worker_group(MitoConfig { num_workers: 4, diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs new file mode 100644 index 0000000000..d3f6c45851 --- /dev/null +++ b/src/mito2/src/worker/handle_close.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling close request. + +use common_telemetry::info; + +use crate::error::Result; +use crate::worker::request::CloseRequest; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + pub(crate) async fn handle_close_request(&mut self, request: CloseRequest) -> Result<()> { + let Some(region) = self.regions.get_region(request.region_id) else { + return Ok(()); + }; + + info!("Try to close region {}", request.region_id); + + region.stop().await?; + self.regions.remove_region(request.region_id); + + info!("Region {} closed", request.region_id); + + Ok(()) + } +} diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 51da861624..62f0dcb1e5 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -16,7 +16,7 @@ use std::sync::Arc; -use common_telemetry::logging; +use common_telemetry::info; use snafu::ensure; use crate::error::{RegionExistsSnafu, Result}; @@ -50,16 +50,19 @@ impl RegionWorkerLoop { // Create a MitoRegion from the RegionMetadata. let region = RegionOpener::new( - metadata, + request.region_id, self.memtable_builder.clone(), self.object_store.clone(), ) + .metadata(metadata) .region_dir(&request.region_dir) .create(&self.config) .await?; // TODO(yingwen): Custom the Debug format for the metadata and also print it. - logging::info!("A new region created, region_id: {}", region.region_id); + info!("A new region created, region_id: {}", region.region_id); + + // TODO(yingwen): Metrics. // Insert the MitoRegion into the RegionMap. self.regions.insert_region(Arc::new(region)); diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 9b3ab082fa..3128f5fa96 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -14,12 +14,38 @@ //! Handling open request. +use std::sync::Arc; + +use common_telemetry::info; + use crate::error::Result; +use crate::region::opener::RegionOpener; use crate::worker::request::OpenRequest; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_open_request(&mut self, _request: OpenRequest) -> Result<()> { - unimplemented!() + pub(crate) async fn handle_open_request(&mut self, request: OpenRequest) -> Result<()> { + if self.regions.is_region_exists(request.region_id) { + return Ok(()); + } + + info!("Try to open region {}", request.region_id); + + // Open region from specific region dir. + let region = RegionOpener::new( + request.region_id, + self.memtable_builder.clone(), + self.object_store.clone(), + ) + .region_dir(&request.region_dir) + .open(&self.config) + .await?; + + info!("Region {} is opened", request.region_id); + + // Insert the MitoRegion into the RegionMap. + self.regions.insert_region(Arc::new(region)); + + Ok(()) } } diff --git a/src/mito2/src/worker/request.rs b/src/mito2/src/worker/request.rs index bf46aaee68..a18105a754 100644 --- a/src/mito2/src/worker/request.rs +++ b/src/mito2/src/worker/request.rs @@ -69,10 +69,19 @@ pub struct CreateRequest { pub struct OpenRequest { /// Region to open. pub region_id: RegionId, + /// Data directory of the region. + pub region_dir: String, /// Options of the created region. pub options: RegionOptions, } +/// Close region request. +#[derive(Debug)] +pub struct CloseRequest { + /// Region to close. + pub region_id: RegionId, +} + /// Request to write a region. #[derive(Debug)] pub(crate) struct WriteRequest { @@ -93,6 +102,9 @@ pub(crate) enum WorkerRequest { #[derive(Debug)] pub(crate) struct RegionRequest { /// Sender to send result. + /// + /// Now the result is a `Result<()>`, but we could replace the empty tuple + /// with an enum if we need to carry more information. pub(crate) sender: Option>>, /// Request body. pub(crate) body: RequestBody, @@ -124,6 +136,8 @@ pub(crate) enum RequestBody { Create(CreateRequest), /// Opens an existing region. Open(OpenRequest), + /// Closes a region. + Close(CloseRequest), } impl RequestBody { @@ -133,6 +147,7 @@ impl RequestBody { RequestBody::Write(req) => req.region_id, RequestBody::Create(req) => req.region_id, RequestBody::Open(req) => req.region_id, + RequestBody::Close(req) => req.region_id, } } @@ -142,6 +157,7 @@ impl RequestBody { RequestBody::Write(_) => false, RequestBody::Create(_) => true, RequestBody::Open(_) => true, + RequestBody::Close(_) => true, } } }