diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 875a8c7dd0..8b078a9c69 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -14,19 +14,32 @@ //! Configurations. +use common_base::readable_size::ReadableSize; +use common_datasource::compression::CompressionType; use common_telemetry::logging; +/// Default region worker num. const DEFAULT_NUM_WORKERS: usize = 1; +/// Default region write buffer size. +pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32); /// Configuration for [MitoEngine](crate::engine::MitoEngine). #[derive(Debug)] pub struct MitoConfig { - /// Number of region workers. + // Worker configs: + /// Number of region workers (default 1). pub num_workers: usize, - /// Request channel size of each worker. + /// Request channel size of each worker (default 128). pub worker_channel_size: usize, - /// Max batch size for a worker to handle requests. + /// Max batch size for a worker to handle requests (default 64). pub worker_request_batch_size: usize, + + // Manifest configs: + /// Number of meta action updated to trigger a new checkpoint + /// for the manifest (default 10). + pub manifest_checkpoint_interval: u64, + /// Manifest compression type (default uncompressed). + pub manifest_compress_type: CompressionType, } impl Default for MitoConfig { @@ -35,6 +48,8 @@ impl Default for MitoConfig { num_workers: DEFAULT_NUM_WORKERS, worker_channel_size: 128, worker_request_batch_size: 64, + manifest_checkpoint_interval: 10, + manifest_compress_type: CompressionType::Uncompressed, } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c8801ea713..d18d02e7bc 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -12,11 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Mito region engine. + +#[cfg(test)] +mod tests; + use std::sync::Arc; use object_store::ObjectStore; use snafu::ResultExt; use store_api::logstore::LogStore; +use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; @@ -53,6 +59,11 @@ impl MitoEngine { pub async fn create_region(&self, request: CreateRequest) -> Result<()> { self.inner.create_region(request).await } + + /// Returns true if the specific region exists. + pub fn is_region_exists(&self, region_id: RegionId) -> bool { + self.inner.workers.is_region_exists(region_id) + } } /// Inner struct of [MitoEngine]. @@ -69,7 +80,7 @@ impl EngineInner { object_store: ObjectStore, ) -> EngineInner { EngineInner { - workers: WorkerGroup::start(&config, log_store, object_store), + workers: WorkerGroup::start(config, log_store, object_store), } } @@ -86,17 +97,3 @@ impl EngineInner { receiver.await.context(RecvSnafu)? } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_util::TestEnv; - - #[tokio::test] - async fn test_engine_new_stop() { - let env = TestEnv::new("engine-stop"); - let engine = env.create_engine(MitoConfig::default()).await; - - engine.stop().await.unwrap(); - } -} diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs new file mode 100644 index 0000000000..5845f749aa --- /dev/null +++ b/src/mito2/src/engine/tests.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for mito engine. + +use store_api::storage::RegionId; + +use super::*; +use crate::error::Error; +use crate::test_util::{CreateRequestBuilder, TestEnv}; + +#[tokio::test] +async fn test_engine_new_stop() { + let env = TestEnv::new("engine-stop"); + let engine = env.create_engine(MitoConfig::default()).await; + + engine.stop().await.unwrap(); + + let request = CreateRequestBuilder::new(RegionId::new(1, 1)).build(); + let err = engine.create_region(request).await.unwrap_err(); + assert!( + matches!(err, Error::WorkerStopped { .. }), + "unexpected err: {err}" + ); +} + +#[tokio::test] +async fn test_engine_create_new_region() { + let env = TestEnv::new("new-region"); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new(region_id).build(); + engine.create_region(request).await.unwrap(); + + assert!(engine.is_region_exists(region_id)); +} + +#[tokio::test] +async fn test_engine_create_region_if_not_exists() { + let env = TestEnv::new("create-not-exists"); + let engine = env.create_engine(MitoConfig::default()).await; + + let builder = CreateRequestBuilder::new(RegionId::new(1, 1)).create_if_not_exists(true); + engine.create_region(builder.build()).await.unwrap(); + + // Create the same region again. + engine.create_region(builder.build()).await.unwrap(); +} + +#[tokio::test] +async fn test_engine_create_existing_region() { + let env = TestEnv::new("create-existing"); + let engine = env.create_engine(MitoConfig::default()).await; + + let builder = CreateRequestBuilder::new(RegionId::new(1, 1)); + engine.create_region(builder.build()).await.unwrap(); + + // Create the same region again. + let err = engine.create_region(builder.build()).await.unwrap_err(); + assert!( + matches!(err, Error::RegionExists { .. }), + "unexpected err: {err}" + ); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 538d9ad2c9..16136ef377 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -19,6 +19,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; +use store_api::storage::RegionId; use crate::worker::WorkerId; @@ -110,6 +111,12 @@ pub enum Error { source: datatypes::error::Error, location: Location, }, + + #[snafu(display("Region {} already exists, location: {}", region_id, location))] + RegionExists { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -120,9 +127,11 @@ impl ErrorExt for Error { match self { OpenDal { .. } => StatusCode::StorageUnavailable, - CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => { - StatusCode::Unexpected - } + CompressObject { .. } + | DecompressObject { .. } + | SerdeJson { .. } + | Utf8 { .. } + | RegionExists { .. } => StatusCode::Unexpected, InvalidScanIndex { .. } | InitialMetadata { .. } | InvalidMeta { .. } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e1de79acb4..7a82312e98 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -292,7 +292,7 @@ mod test { async fn create_region_without_initial_metadata() { let env = TestEnv::new(""); let result = env - .create_manifest_manager(CompressionType::Uncompressed, None, None) + .create_manifest_manager(CompressionType::Uncompressed, 10, None) .await; assert!(matches!( result.err().unwrap(), @@ -305,7 +305,7 @@ mod test { let metadata = basic_region_metadata(); let env = TestEnv::new(""); let manager = env - .create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone())) + .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await .unwrap(); @@ -318,7 +318,7 @@ mod test { let metadata = basic_region_metadata(); let env = TestEnv::new(""); let manager = env - .create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone())) + .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await .unwrap(); diff --git a/src/mito2/src/manifest/options.rs b/src/mito2/src/manifest/options.rs index 357d3ab526..6f6d64cfd1 100644 --- a/src/mito2/src/manifest/options.rs +++ b/src/mito2/src/manifest/options.rs @@ -24,10 +24,10 @@ pub struct RegionManifestOptions { pub manifest_dir: String, pub object_store: ObjectStore, pub compress_type: CompressionType, - /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints - /// `None` means disable checkpoint. - pub checkpoint_interval: Option, + /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints. + pub checkpoint_interval: u64, /// Initial [RegionMetadata](crate::metadata::RegionMetadata) of this region. /// Only need to set when create a new region, otherwise it will be ignored. + // TODO(yingwen): Could we pass RegionMetadataRef? pub initial_metadata: Option, } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index afee0bd520..5d6a98e377 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -17,6 +17,7 @@ pub(crate) mod version; use std::fmt; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use crate::metadata::RegionMetadataRef; @@ -41,3 +42,39 @@ pub trait MemtableBuilder: Send + Sync + fmt::Debug { } pub type MemtableBuilderRef = Arc; + +// TODO(yingwen): Remove it once we port the memtable. +/// Empty memtable for test. +#[derive(Debug, Default)] +pub(crate) struct EmptyMemtable { + /// Id of this memtable. + id: MemtableId, +} + +impl EmptyMemtable { + /// Returns a new memtable with specific `id`. + pub(crate) fn new(id: MemtableId) -> EmptyMemtable { + EmptyMemtable { id } + } +} + +impl Memtable for EmptyMemtable { + fn id(&self) -> MemtableId { + self.id + } +} + +/// Default memtable builder. +#[derive(Debug, Default)] +pub(crate) struct DefaultMemtableBuilder { + /// Next memtable id. + next_id: AtomicU32, +} + +impl MemtableBuilder for DefaultMemtableBuilder { + fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(EmptyMemtable::new( + self.next_id.fetch_add(1, Ordering::Relaxed), + )) + } +} diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 84e196bc10..00c12c9f38 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -27,6 +27,9 @@ use store_api::storage::{ColumnId, RegionId}; use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result}; use crate::region::VersionNumber; +/// Initial version number of a new region. +pub(crate) const INIT_REGION_VERSION: VersionNumber = 0; + #[cfg_attr(doc, aquamarine::aquamarine)] /// Static metadata of a region. /// @@ -57,6 +60,9 @@ pub struct RegionMetadata { /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas). #[serde(skip)] pub schema: SchemaRef, + + // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields] + // so we can assumes they are valid. /// Id of the time index column. #[serde(skip)] time_index: ColumnId, @@ -153,6 +159,22 @@ impl SkippedFields { } impl RegionMetadata { + /// Find column by id. + pub(crate) fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> { + self.id_to_index + .get(&column_id) + .map(|index| &self.column_metadatas[*index]) + } + + /// Returns the time index column + /// + /// # Panics + /// Panics if the time index column id is invalid. + pub(crate) fn time_index_column(&self) -> &ColumnMetadata { + let index = self.id_to_index[&self.time_index]; + &self.column_metadatas[index] + } + /// Checks whether the metadata is valid. fn validate(&self) -> Result<()> { // Id to name. @@ -188,6 +210,17 @@ impl RegionMetadata { } ); + // Checks the time index column is not nullable. + ensure!( + !self.time_index_column().column_schema.is_nullable(), + InvalidMetaSnafu { + reason: format!( + "time index column {} must be NOT NULL", + self.time_index_column().column_schema.name + ), + } + ); + if !self.primary_key.is_empty() { let mut pk_ids = HashSet::with_capacity(self.primary_key.len()); // Checks column ids in the primary key is valid. @@ -362,6 +395,17 @@ mod test { builder.build().unwrap() } + #[test] + fn test_region_metadata() { + let region_metadata = build_test_region_metadata(); + assert_eq!("c", region_metadata.time_index_column().column_schema.name); + assert_eq!( + "a", + region_metadata.column_by_id(1).unwrap().column_schema.name + ); + assert_eq!(None, region_metadata.column_by_id(10)); + } + #[test] fn test_region_metadata_serde() { let region_metadata = build_test_region_metadata(); @@ -384,8 +428,7 @@ mod test { let err = builder.build().unwrap_err(); assert!( err.to_string().contains("ts is not timestamp compatible"), - "unexpected err: {}", - err + "unexpected err: {err}", ); } @@ -396,8 +439,7 @@ mod test { // A region must have a time index. assert!( err.to_string().contains("time index not found"), - "unexpected err: {}", - err + "unexpected err: {err}", ); } @@ -423,8 +465,7 @@ mod test { assert!( err.to_string() .contains("column a and b have the same column id"), - "unexpected err: {}", - err + "unexpected err: {err}", ); } @@ -453,8 +494,7 @@ mod test { let err = builder.build().unwrap_err(); assert!( err.to_string().contains("expect only one time index"), - "unexpected err: {}", - err + "unexpected err: {err}", ); } @@ -480,8 +520,7 @@ mod test { let err = builder.build().unwrap_err(); assert!( err.to_string().contains("unknown column id 3"), - "unexpected err: {}", - err + "unexpected err: {err}", ); } @@ -508,8 +547,7 @@ mod test { assert!( err.to_string() .contains("duplicate column a in primary key"), - "unexpected err: {}", - err + "unexpected err: {err}", ); } @@ -531,8 +569,27 @@ mod test { assert!( err.to_string() .contains("column ts is already a time index column"), - "unexpected err: {}", - err + "unexpected err: {err}", + ); + } + + #[test] + fn test_nullable_time_index() { + let mut builder = create_builder(); + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }); + let err = builder.build().unwrap_err(); + assert!( + err.to_string() + .contains("time index column ts must be NOT NULL"), + "unexpected err: {err}", ); } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c08a6c061b..22a235e787 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -14,6 +14,7 @@ //! Mito region. +pub(crate) mod opener; mod version; use std::collections::HashMap; @@ -21,9 +22,8 @@ use std::sync::{Arc, RwLock}; use store_api::storage::RegionId; -use crate::memtable::MemtableBuilderRef; -use crate::metadata::RegionMetadataRef; -use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; +use crate::manifest::manager::RegionManifestManager; +use crate::region::version::VersionControlRef; /// Type to store region version. pub type VersionNumber = u32; @@ -31,7 +31,16 @@ pub type VersionNumber = u32; /// Metadata and runtime status of a region. #[derive(Debug)] pub(crate) struct MitoRegion { + /// Id of this region. + /// + /// Accessing region id from the version control is inconvenient so + /// we also store it here. + pub(crate) region_id: RegionId, + + /// Version controller for this region. version_control: VersionControlRef, + /// Manager to maintain manifest for this region. + manifest_manager: RegionManifestManager, } pub(crate) type MitoRegionRef = Arc; @@ -42,33 +51,18 @@ pub(crate) struct RegionMap { regions: RwLock>, } +impl RegionMap { + /// Returns true if the region exists. + pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool { + let regions = self.regions.read().unwrap(); + regions.contains_key(®ion_id) + } + + /// Inserts a new region into the map. + pub(crate) fn insert_region(&self, region: MitoRegionRef) { + let mut regions = self.regions.write().unwrap(); + regions.insert(region.region_id, region); + } +} + pub(crate) type RegionMapRef = Arc; - -/// [MitoRegion] builder. -pub(crate) struct RegionBuilder { - metadata: RegionMetadataRef, - memtable_builder: MemtableBuilderRef, -} - -impl RegionBuilder { - /// Returns a new builder. - pub(crate) fn new( - metadata: RegionMetadataRef, - memtable_builder: MemtableBuilderRef, - ) -> RegionBuilder { - RegionBuilder { - metadata, - memtable_builder, - } - } - - /// Builds a new region. - pub(crate) fn build(self) -> MitoRegion { - let mutable = self.memtable_builder.build(&self.metadata); - - let version = VersionBuilder::new(self.metadata, mutable).build(); - let version_control = Arc::new(VersionControl::new(version)); - - MitoRegion { version_control } - } -} diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs new file mode 100644 index 0000000000..2ff78b696b --- /dev/null +++ b/src/mito2/src/region/opener.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Region opener. + +use std::sync::Arc; + +use object_store::util::join_dir; +use object_store::ObjectStore; + +use crate::config::MitoConfig; +use crate::error::Result; +use crate::manifest::manager::RegionManifestManager; +use crate::manifest::options::RegionManifestOptions; +use crate::memtable::MemtableBuilderRef; +use crate::metadata::RegionMetadata; +use crate::region::version::{VersionBuilder, VersionControl}; +use crate::region::MitoRegion; + +/// Builder to create a new [MitoRegion] or open an existing one. +pub(crate) struct RegionOpener { + metadata: RegionMetadata, + memtable_builder: MemtableBuilderRef, + object_store: ObjectStore, + region_dir: String, +} + +impl RegionOpener { + /// Returns a new opener. + pub(crate) fn new( + metadata: RegionMetadata, + memtable_builder: MemtableBuilderRef, + object_store: ObjectStore, + ) -> RegionOpener { + RegionOpener { + metadata, + memtable_builder, + object_store, + region_dir: String::new(), + } + } + + /// Sets the region dir. + pub(crate) fn region_dir(mut self, value: &str) -> Self { + self.region_dir = value.to_string(); + self + } + + /// Writes region manifest and creates a new region. + pub(crate) async fn create(self, config: &MitoConfig) -> Result { + let region_id = self.metadata.region_id; + // Create a manifest manager for this region. + let options = RegionManifestOptions { + manifest_dir: new_manifest_dir(&self.region_dir), + object_store: self.object_store, + compress_type: config.manifest_compress_type, + checkpoint_interval: config.manifest_checkpoint_interval, + // We are creating a new region, so we need to set this field. + initial_metadata: Some(self.metadata.clone()), + }; + // Writes regions to the manifest file. + let manifest_manager = RegionManifestManager::new(options).await?; + + let metadata = Arc::new(self.metadata); + let mutable = self.memtable_builder.build(&metadata); + + let version = VersionBuilder::new(metadata, mutable).build(); + let version_control = Arc::new(VersionControl::new(version)); + + Ok(MitoRegion { + region_id, + version_control, + manifest_manager, + }) + } +} + +/// Returns the directory to the manifest files. +fn new_manifest_dir(region_dir: &str) -> String { + join_dir(region_dir, "manifest") +} diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index aa4960a88c..d9c5e9ecec 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -68,8 +68,10 @@ pub(crate) struct Version { ssts: SstVersionRef, /// Inclusive max sequence of flushed data. flushed_sequence: SequenceNumber, + // TODO(yingwen): Remove this. /// Current version of region manifest. manifest_version: ManifestVersion, + // TODO(yingwen): RegionOptions. } /// Version builder. diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index 9c309a4b36..6c85430b08 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -23,7 +23,7 @@ use crate::sst::file::{FileHandle, FileId, Level, MAX_LEVEL}; #[derive(Debug)] pub(crate) struct SstVersion { /// SST metadata organized by levels. - levels: LevelMetaVec, + levels: LevelMetaArray, } pub(crate) type SstVersionRef = Arc; @@ -38,8 +38,8 @@ impl SstVersion { } // We only has fixed number of level, so we use array to hold elements. This implementation -// detail of LevelMetaVec should not be exposed to the user of [LevelMetas]. -type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize]; +// detail of LevelMetaArray should not be exposed to users of [LevelMetas]. +type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize]; /// Metadata of files in the same SST level. pub struct LevelMeta { @@ -68,10 +68,10 @@ impl fmt::Debug for LevelMeta { } } -fn new_level_meta_vec() -> LevelMetaVec { +fn new_level_meta_vec() -> LevelMetaArray { (0u8..MAX_LEVEL) .map(LevelMeta::new) .collect::>() .try_into() - .unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL + .unwrap() // safety: LevelMetaArray is a fixed length array with length MAX_LEVEL } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 3fe3e02d99..ba663bca4d 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -14,24 +14,26 @@ //! Utilities for testing. -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::services::Fs; use object_store::util::join_dir; use object_store::ObjectStore; +use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; use crate::manifest::options::RegionManifestOptions; -use crate::memtable::{Memtable, MemtableBuilder, MemtableId, MemtableRef}; -use crate::metadata::{RegionMetadata, RegionMetadataRef}; +use crate::metadata::{ColumnMetadata, RegionMetadata, SemanticType}; +use crate::worker::request::{CreateRequest, RegionOptions}; use crate::worker::WorkerGroup; /// Env to test mito engine. @@ -56,7 +58,7 @@ impl TestEnv { } /// Creates a new [WorkerGroup] with specific config under this env. - pub(crate) async fn create_worker_group(&self, config: &MitoConfig) -> WorkerGroup { + pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store) = self.create_log_and_object_store().await; WorkerGroup::start(config, Arc::new(log_store), object_store) @@ -78,7 +80,7 @@ impl TestEnv { pub async fn create_manifest_manager( &self, compress_type: CompressionType, - checkpoint_interval: Option, + checkpoint_interval: u64, initial_metadata: Option, ) -> Result { let data_home = self.data_home.path().to_str().unwrap(); @@ -100,36 +102,101 @@ impl TestEnv { } } -/// Memtable that only for testing metadata. -#[derive(Debug, Default)] -pub struct MetaOnlyMemtable { - /// Id of this memtable. - id: MemtableId, +/// Builder to mock a [CreateRequest]. +pub struct CreateRequestBuilder { + region_id: RegionId, + region_dir: String, + tag_num: usize, + field_num: usize, + create_if_not_exists: bool, } -impl MetaOnlyMemtable { - /// Returns a new memtable with specific `id`. - pub fn new(id: MemtableId) -> MetaOnlyMemtable { - MetaOnlyMemtable { id } +impl Default for CreateRequestBuilder { + fn default() -> Self { + CreateRequestBuilder { + region_id: RegionId::default(), + region_dir: "test".to_string(), + tag_num: 1, + field_num: 1, + create_if_not_exists: false, + } } } -impl Memtable for MetaOnlyMemtable { - fn id(&self) -> MemtableId { - self.id - } -} - -#[derive(Debug, Default)] -pub struct MetaOnlyBuilder { - /// Next memtable id. - next_id: AtomicU32, -} - -impl MemtableBuilder for MetaOnlyBuilder { - fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(MetaOnlyMemtable::new( - self.next_id.fetch_add(1, Ordering::Relaxed), - )) +impl CreateRequestBuilder { + pub fn new(region_id: RegionId) -> CreateRequestBuilder { + CreateRequestBuilder { + region_id, + ..Default::default() + } + } + + pub fn region_dir(mut self, value: &str) -> Self { + self.region_dir = value.to_string(); + self + } + + pub fn tag_num(mut self, value: usize) -> Self { + self.tag_num = value; + self + } + + pub fn field_num(mut self, value: usize) -> Self { + self.tag_num = value; + self + } + + pub fn create_if_not_exists(mut self, value: bool) -> Self { + self.create_if_not_exists = value; + self + } + + pub fn build(&self) -> CreateRequest { + let mut column_id = 0; + let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); + let mut primary_key = Vec::with_capacity(self.tag_num); + for i in 0..self.tag_num { + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id, + }); + primary_key.push(column_id); + column_id += 1; + } + for i in 0..self.field_num { + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::float64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id, + }); + column_id += 1; + } + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id, + }); + + CreateRequest { + region_id: self.region_id, + region_dir: self.region_dir.clone(), + column_metadatas, + primary_key, + create_if_not_exists: self.create_if_not_exists, + options: RegionOptions::default(), + } } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 0f92c416f8..838994adf9 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -35,6 +35,7 @@ use tokio::sync::{mpsc, Mutex}; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; +use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::region::{RegionMap, RegionMapRef}; use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest}; @@ -87,20 +88,18 @@ impl WorkerGroup { /// /// The number of workers should be power of two. pub(crate) fn start( - config: &MitoConfig, + config: MitoConfig, log_store: Arc, object_store: ObjectStore, ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); + let config = Arc::new(config); let workers = (0..config.num_workers) .map(|id| { RegionWorker::start( - WorkerConfig { - id: id as WorkerId, - channel_size: config.worker_channel_size, - request_batch_size: config.worker_request_batch_size, - }, + id as WorkerId, + config.clone(), log_store.clone(), object_store.clone(), ) @@ -126,6 +125,11 @@ impl WorkerGroup { .await } + /// Returns true if the specific region exists. + pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool { + self.worker(region_id).is_region_exists(region_id) + } + /// Get worker for specific `region_id`. fn worker(&self, region_id: RegionId) -> &RegionWorker { let mut hasher = DefaultHasher::new(); @@ -141,17 +145,6 @@ fn value_to_index(value: usize, num_workers: usize) -> usize { value & (num_workers - 1) } -/// Config for region worker. -#[derive(Debug, Clone)] -struct WorkerConfig { - /// Id of the worker - id: WorkerId, - /// Capacity of the request channel. - channel_size: usize, - /// Batch size to process request. - request_batch_size: usize, -} - /// Worker to write and alter regions bound to it. #[derive(Debug)] pub(crate) struct RegionWorker { @@ -170,29 +163,31 @@ pub(crate) struct RegionWorker { impl RegionWorker { /// Start a region worker and its background thread. fn start( - config: WorkerConfig, + id: WorkerId, + config: Arc, log_store: Arc, object_store: ObjectStore, ) -> RegionWorker { let regions = Arc::new(RegionMap::default()); - let (sender, receiver) = mpsc::channel(config.channel_size); + let (sender, receiver) = mpsc::channel(config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); let mut worker_thread = RegionWorkerLoop { - id: config.id, + id, + config, regions: regions.clone(), receiver, log_store, object_store, running: running.clone(), - request_batch_size: config.request_batch_size, + memtable_builder: Arc::new(DefaultMemtableBuilder::default()), }; let handle = common_runtime::spawn_bg(async move { worker_thread.run().await; }); RegionWorker { - id: config.id, + id, regions, sender, handle: Mutex::new(Some(handle)), @@ -249,6 +244,11 @@ impl RegionWorker { fn set_running(&self, value: bool) { self.running.store(value, Ordering::Relaxed) } + + /// Returns true if the worker contains specific region. + fn is_region_exists(&self, region_id: RegionId) -> bool { + self.regions.is_region_exists(region_id) + } } impl Drop for RegionWorker { @@ -266,6 +266,8 @@ type RequestBuffer = Vec; struct RegionWorkerLoop { // Id of the worker. id: WorkerId, + /// Engine config. + config: Arc, /// Regions bound to the worker. regions: RegionMapRef, /// Request receiver. @@ -276,8 +278,8 @@ struct RegionWorkerLoop { object_store: ObjectStore, /// Whether the worker thread is still running. running: Arc, - /// Batch size to fetch requests from channel. - request_batch_size: usize, + /// Memtable builder for each region. + memtable_builder: MemtableBuilderRef, } impl RegionWorkerLoop { @@ -286,7 +288,7 @@ impl RegionWorkerLoop { logging::info!("Start region worker thread {}", self.id); // Buffer to retrieve requests from receiver. - let mut buffer = RequestBuffer::with_capacity(self.request_batch_size); + let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size); while self.running.load(Ordering::Relaxed) { // Clear the buffer before handling next batch of requests. @@ -398,7 +400,7 @@ mod tests { async fn test_worker_group_start_stop() { let env = TestEnv::new("group-stop"); let group = env - .create_worker_group(&MitoConfig { + .create_worker_group(MitoConfig { num_workers: 4, ..Default::default() }) diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index b001cc295e..51da861624 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -14,17 +14,56 @@ //! Handling create request. -use crate::error::Result; +use std::sync::Arc; + +use common_telemetry::logging; +use snafu::ensure; + +use crate::error::{RegionExistsSnafu, Result}; +use crate::metadata::{RegionMetadataBuilder, INIT_REGION_VERSION}; +use crate::region::opener::RegionOpener; use crate::worker::request::CreateRequest; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_create_request(&mut self, _request: CreateRequest) -> Result<()> { - // 1. Checks whether the table exists. + pub(crate) async fn handle_create_request(&mut self, request: CreateRequest) -> Result<()> { + // Checks whether the table exists. + if self.regions.is_region_exists(request.region_id) { + ensure!( + request.create_if_not_exists, + RegionExistsSnafu { + region_id: request.region_id, + } + ); - // 2. Convert the request into RegionMetadata + // Region already exists. + return Ok(()); + } - // 3. Write manifest - unimplemented!() + // Convert the request into a RegionMetadata and validate it. + let mut builder = RegionMetadataBuilder::new(request.region_id, INIT_REGION_VERSION); + for column in request.column_metadatas { + builder.push_column_metadata(column); + } + builder.primary_key(request.primary_key); + let metadata = builder.build()?; + + // Create a MitoRegion from the RegionMetadata. + let region = RegionOpener::new( + metadata, + self.memtable_builder.clone(), + self.object_store.clone(), + ) + .region_dir(&request.region_dir) + .create(&self.config) + .await?; + + // TODO(yingwen): Custom the Debug format for the metadata and also print it. + logging::info!("A new region created, region_id: {}", region.region_id); + + // Insert the MitoRegion into the RegionMap. + self.regions.insert_region(Arc::new(region)); + + Ok(()) } } diff --git a/src/mito2/src/worker/request.rs b/src/mito2/src/worker/request.rs index eb0bf66a0c..bf46aaee68 100644 --- a/src/mito2/src/worker/request.rs +++ b/src/mito2/src/worker/request.rs @@ -20,6 +20,7 @@ use common_base::readable_size::ReadableSize; use store_api::storage::{ColumnId, CompactionStrategy, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; +use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::Result; use crate::metadata::ColumnMetadata; @@ -36,11 +37,23 @@ pub struct RegionOptions { pub compaction_strategy: CompactionStrategy, } +impl Default for RegionOptions { + fn default() -> Self { + RegionOptions { + write_buffer_size: Some(DEFAULT_WRITE_BUFFER_SIZE), + ttl: None, + compaction_strategy: CompactionStrategy::LeveledTimeWindow, + } + } +} + /// Create region request. #[derive(Debug)] pub struct CreateRequest { /// Region to create. pub region_id: RegionId, + /// Data directory of the region. + pub region_dir: String, /// Columns in this region. pub column_metadatas: Vec, /// Columns in the primary key. @@ -51,13 +64,6 @@ pub struct CreateRequest { pub options: RegionOptions, } -impl CreateRequest { - /// Validate the request. - fn validate(&self) -> Result<()> { - unimplemented!() - } -} - /// Open region request. #[derive(Debug)] pub struct OpenRequest { diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index fdb15b4256..1994c1d74c 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -39,8 +39,7 @@ pub fn normalize_dir(dir: &str) -> String { pub fn join_dir(parent: &str, child: &str) -> String { // Always adds a `/` to the output path. let output = format!("{parent}/{child}/"); - // We call opendal's normalize_dir which doesn't push `/` to - // the end of path. + // We call opendal's normalize_root which keep the last `/`. opendal::raw::normalize_root(&output) }