diff --git a/Cargo.lock b/Cargo.lock index 9b824efe8d..836d461a8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4749,6 +4749,29 @@ dependencies = [ "meter-core", ] +[[package]] +name = "metric-engine" +version = "0.4.2" +dependencies = [ + "api", + "async-trait", + "base64 0.21.5", + "common-error", + "common-macro", + "common-query", + "common-recordbatch", + "common-telemetry", + "common-test-util", + "common-time", + "datatypes", + "mito2", + "object-store", + "serde_json", + "snafu", + "store-api", + "tokio", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/Cargo.toml b/Cargo.toml index b9c0845057..4843f15713 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "src/log-store", "src/meta-client", "src/meta-srv", + "src/metric-engine", "src/mito2", "src/object-store", "src/operator", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml new file mode 100644 index 0000000000..46c76fd79b --- /dev/null +++ b/src/metric-engine/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "metric-engine" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +api.workspace = true +async-trait.workspace = true +base64 = "0.21" +common-error.workspace = true +common-macro.workspace = true +common-query.workspace = true +common-recordbatch.workspace = true +common-telemetry.workspace = true +common-time.workspace = true +datatypes.workspace = true +mito2.workspace = true +object-store.workspace = true +serde_json.workspace = true +snafu.workspace = true +store-api.workspace = true +tokio.workspace = true + +[dev-dependencies] +common-test-util.workspace = true +mito2 = { workspace = true, features = ["test"] } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs new file mode 100644 index 0000000000..0262c32558 --- /dev/null +++ b/src/metric-engine/src/engine.rs @@ -0,0 +1,311 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_query::Output; +use common_recordbatch::SendableRecordBatchStream; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use datatypes::value::Value; +use mito2::engine::{MitoEngine, MITO_ENGINE_NAME}; +use object_store::util::join_dir; +use snafu::ResultExt; +use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; +use store_api::region_engine::{RegionEngine, RegionRole}; +use store_api::region_request::{RegionCreateRequest, RegionRequest}; +use store_api::storage::{RegionGroup, RegionId, ScanRequest}; + +use crate::error::{CreateMitoRegionSnafu, Result}; +use crate::utils; + +/// region group value for data region inside a metric region +pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0; + +/// region group value for metadata region inside a metric region +pub const METRIC_METADATA_REGION_GROUP: RegionGroup = 1; + +const METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME: &str = "ts"; +const METADATA_SCHEMA_KEY_COLUMN_NAME: &str = "k"; +const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "val"; + +const METADATA_REGION_SUBDIR: &str = "metadata"; +const DATA_REGION_SUBDIR: &str = "data"; + +pub const METRIC_ENGINE_NAME: &str = "metric"; + +pub struct MetricEngine { + inner: Arc, +} + +#[async_trait] +impl RegionEngine for MetricEngine { + /// Name of this engine + fn name(&self) -> &str { + METRIC_ENGINE_NAME + } + + /// Handles request to the region. + /// + /// Only query is not included, which is handled in `handle_query` + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> std::result::Result { + let result = match request { + RegionRequest::Put(_) => todo!(), + RegionRequest::Delete(_) => todo!(), + RegionRequest::Create(create) => self + .inner + .create_region(region_id, create) + .await + .map(|_| Output::AffectedRows(0)), + RegionRequest::Drop(_) => todo!(), + RegionRequest::Open(_) => todo!(), + RegionRequest::Close(_) => todo!(), + RegionRequest::Alter(_) => todo!(), + RegionRequest::Flush(_) => todo!(), + RegionRequest::Compact(_) => todo!(), + RegionRequest::Truncate(_) => todo!(), + }; + + result.map_err(BoxedError::new) + } + + /// Handles substrait query and return a stream of record batches + async fn handle_query( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> std::result::Result { + todo!() + } + + /// Retrieves region's metadata. + async fn get_metadata( + &self, + region_id: RegionId, + ) -> std::result::Result { + todo!() + } + + /// Retrieves region's disk usage. + async fn region_disk_usage(&self, region_id: RegionId) -> Option { + todo!() + } + + /// Stops the engine + async fn stop(&self) -> std::result::Result<(), BoxedError> { + todo!() + } + + fn set_writable( + &self, + region_id: RegionId, + writable: bool, + ) -> std::result::Result<(), BoxedError> { + todo!() + } + + fn role(&self, region_id: RegionId) -> Option { + todo!() + } +} + +struct MetricEngineInner { + mito: MitoEngine, +} + +impl MetricEngineInner { + pub async fn create_region( + &self, + region_id: RegionId, + request: RegionCreateRequest, + ) -> Result<()> { + self.verify_region_create_request(&request)?; + + let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id); + let create_data_region_request = self.create_request_for_data_region(&request); + let create_metadata_region_request = + self.create_request_for_metadata_region(&request.region_dir); + + // self.mito + // .handle_request( + // data_region_id, + // RegionRequest::Create(create_data_region_request), + // ) + // .await + // .with_context(|_| CreateMitoRegionSnafu { + // region_type: DATA_REGION_SUBDIR, + // })?; + self.mito + .handle_request( + metadata_region_id, + RegionRequest::Create(create_metadata_region_request), + ) + .await + .with_context(|_| CreateMitoRegionSnafu { + region_type: METADATA_REGION_SUBDIR, + })?; + + Ok(()) + } + + /// Check if + /// - internal columns are present + fn verify_region_create_request(&self, request: &RegionCreateRequest) -> Result<()> { + let name_to_index = request + .column_metadatas + .iter() + .enumerate() + .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx)) + .collect::>(); + + Ok(()) + } + + /// Build data region id and metadata region id from the given region id. + /// + /// Return value: (data_region_id, metadata_region_id) + fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) { + ( + utils::to_data_region_id(region_id), + utils::to_metadata_region_id(region_id), + ) + } + + /// Build [RegionCreateRequest] for metadata region + /// + /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`. + pub fn create_request_for_metadata_region(&self, region_dir: &str) -> RegionCreateRequest { + // ts TIME INDEX DEFAULT 0 + let timestamp_column_metadata = ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value( + Value::Timestamp(Timestamp::new_millisecond(0)), + ))) + .unwrap(), + }; + // key STRING PRIMARY KEY + let key_column_metadata = ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + METADATA_SCHEMA_KEY_COLUMN_NAME, + ConcreteDataType::string_datatype(), + false, + ), + }; + // val STRING + let value_column_metadata = ColumnMetadata { + column_id: 2, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + METADATA_SCHEMA_VALUE_COLUMN_NAME, + ConcreteDataType::string_datatype(), + true, + ), + }; + + let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR); + + RegionCreateRequest { + engine: MITO_ENGINE_NAME.to_string(), + column_metadatas: vec![ + timestamp_column_metadata, + key_column_metadata, + value_column_metadata, + ], + primary_key: vec![1], + options: HashMap::new(), + region_dir: metadata_region_dir, + } + } + + // todo: register "tag columns" to metadata + pub fn create_request_for_data_region( + &self, + request: &RegionCreateRequest, + ) -> RegionCreateRequest { + let mut data_region_request = request.clone(); + + // concat region dir + data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR); + + // todo: change semantic type and primary key + + // todo: add internal column + + data_region_request + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use common_telemetry::info; + + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn create_metadata_region() { + common_telemetry::init_default_ut_logging(); + + let env = TestEnv::new().await; + let mito = env.mito(); + let engine = MetricEngine { + inner: Arc::new(MetricEngineInner { mito }), + }; + let engine_dir = env.data_home(); + let region_dir = join_dir(&engine_dir, "test_metric_region"); + + let region_id = RegionId::new(1, 2); + let region_create_request = RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas: vec![], + primary_key: vec![], + options: HashMap::new(), + region_dir: "test_metric_region".to_string(), + }; + + // create the region + engine + .handle_request(region_id, RegionRequest::Create(region_create_request)) + .await + .unwrap(); + + // assert metadata region's dir + let metadata_region_dir = join_dir(®ion_dir, METADATA_REGION_SUBDIR); + let exist = tokio::fs::try_exists(region_dir).await.unwrap(); + assert!(exist); + + // check mito engine + let metadata_region_id = utils::to_metadata_region_id(region_id); + let result = env.mito().get_metadata(metadata_region_id).await.unwrap(); + } +} diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs new file mode 100644 index 0000000000..7bbb38afac --- /dev/null +++ b/src/metric-engine/src/error.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Missing internal column {} in physical metric table", column))] + MissingInternalColumn { column: String, location: Location }, + + #[snafu(display("Failed to create mito region, region type: {}", region_type))] + CreateMitoRegion { + region_type: String, + source: BoxedError, + location: Location, + }, + + #[snafu(display("Table `{}` already exists", table_name))] + TableAlreadyExists { + table_name: String, + location: Location, + }, + + #[snafu(display("Failed to deserialize semantic type from {}", raw))] + DeserializeSemanticType { + raw: String, + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, + + #[snafu(display("Failed to decode base64 column value"))] + DecodeColumnValue { + #[snafu(source)] + error: base64::DecodeError, + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + MissingInternalColumn { .. } + | DeserializeSemanticType { .. } + | DecodeColumnValue { .. } => StatusCode::Unexpected, + + CreateMitoRegion { source, .. } => source.status_code(), + + TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs new file mode 100644 index 0000000000..14d9aa22bb --- /dev/null +++ b/src/metric-engine/src/lib.rs @@ -0,0 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(unused)] +pub mod engine; +pub mod error; +#[allow(unused)] +mod metadata_region; +#[cfg(test)] +mod test_util; +mod utils; diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs new file mode 100644 index 0000000000..09bdc7cd19 --- /dev/null +++ b/src/metric-engine/src/metadata_region.rs @@ -0,0 +1,229 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::SemanticType; +use base64::engine::general_purpose::STANDARD_NO_PAD; +use base64::Engine; +use mito2::engine::MitoEngine; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use crate::error::{ + DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, Result, TableAlreadyExistsSnafu, +}; +use crate::utils; + +/// The other two fields key and value will be used as a k-v storage. +/// It contains two group of key: +/// - `__table_` is used for marking table existence. It doesn't have value. +/// - `__column__` is used for marking column existence, +/// the value is column's semantic type. To avoid the key conflict, this column key +/// will be encoded by base64([STANDARD_NO_PAD]). +/// +/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It +/// will handle all the metadata related operations across physical tables. Thus +/// every operation should be associated to a [RegionId], which is the physical +/// table id + region sequence. This handler will transform the region group by +/// itself. +pub struct MetadataRegion { + mito: MitoEngine, +} + +impl MetadataRegion { + /// Add a new table key to metadata. + /// + /// This method will check if the table key already exists, if so, it will return + /// a [TableAlreadyExistsSnafu] error. + pub fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> { + let region_id = utils::to_metadata_region_id(region_id); + let table_key = Self::concat_table_key(table_name); + + let put_success = self.put_conditionally(region_id, table_key, String::new())?; + + if !put_success { + TableAlreadyExistsSnafu { table_name }.fail() + } else { + Ok(()) + } + } + + /// Add a new column key to metadata. + /// + /// This method won't check if the column already exists. + pub fn add_column( + &self, + region_id: RegionId, + table_name: &str, + column_name: &str, + semantic_type: SemanticType, + ) -> Result<()> { + let region_id = utils::to_metadata_region_id(region_id); + let column_key = Self::concat_column_key(table_name, column_name); + + self.put_conditionally( + region_id, + column_key, + Self::serialize_semantic_type(semantic_type), + )?; + Ok(()) + } +} + +// utils to concat and parse key/value +impl MetadataRegion { + pub fn concat_table_key(table_name: &str) -> String { + format!("__table_{}", table_name) + } + + pub fn concat_column_key(table_name: &str, column_name: &str) -> String { + let encoded_table_name = STANDARD_NO_PAD.encode(table_name); + let encoded_column_name = STANDARD_NO_PAD.encode(column_name); + format!("__column_{}_{}", encoded_table_name, encoded_column_name) + } + + pub fn parse_table_key(key: &str) -> Option<&str> { + key.strip_prefix("__table_") + } + + /// Parse column key to (table_name, column_name) + pub fn parse_column_key(key: &str) -> Result> { + if let Some(stripped) = key.strip_prefix("__column_") { + let mut iter = stripped.split('_'); + let encoded_table_name = iter.next().unwrap(); + let encoded_column_name = iter.next().unwrap(); + + let table_name = STANDARD_NO_PAD + .decode(encoded_table_name) + .context(DecodeColumnValueSnafu)?; + let column_name = STANDARD_NO_PAD + .decode(encoded_column_name) + .context(DecodeColumnValueSnafu)?; + + Ok(Some(( + String::from_utf8(table_name).unwrap(), + String::from_utf8(column_name).unwrap(), + ))) + } else { + Ok(None) + } + } + + pub fn serialize_semantic_type(semantic_type: SemanticType) -> String { + serde_json::to_string(&semantic_type).unwrap() + } + + pub fn deserialize_semantic_type(semantic_type: &str) -> Result { + serde_json::from_str(semantic_type) + .with_context(|_| DeserializeSemanticTypeSnafu { raw: semantic_type }) + } +} + +// simulate to `KvBackend` +// +// methods in this block assume the given region id is transformed. +#[allow(unused_variables)] +impl MetadataRegion { + /// Put if not exist, return if this put operation is successful (error other + /// than "key already exist" will be wrapped in [Err]). + pub fn put_conditionally( + &self, + region_id: RegionId, + key: String, + value: String, + ) -> Result { + todo!() + } + + /// Check if the given key exists. + pub fn exist(&self, region_id: RegionId, key: &str) -> Result { + todo!() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_concat_table_key() { + let table_name = "my_table"; + let expected = "__table_my_table".to_string(); + assert_eq!(MetadataRegion::concat_table_key(table_name), expected); + } + + #[test] + fn test_concat_column_key() { + let table_name = "my_table"; + let column_name = "my_column"; + let expected = "__column_bXlfdGFibGU_bXlfY29sdW1u".to_string(); + assert_eq!( + MetadataRegion::concat_column_key(table_name, column_name), + expected + ); + } + + #[test] + fn test_parse_table_key() { + let encoded = MetadataRegion::concat_column_key("my_table", "my_column"); + assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u"); + + let decoded = MetadataRegion::parse_column_key(&encoded).unwrap(); + assert_eq!( + decoded, + Some(("my_table".to_string(), "my_column".to_string())) + ); + } + + #[test] + fn test_parse_valid_column_key() { + let encoded = MetadataRegion::concat_column_key("my_table", "my_column"); + assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u"); + + let decoded = MetadataRegion::parse_column_key(&encoded).unwrap(); + assert_eq!( + decoded, + Some(("my_table".to_string(), "my_column".to_string())) + ); + } + + #[test] + fn test_parse_invalid_column_key() { + let key = "__column_asdfasd_????"; + let result = MetadataRegion::parse_column_key(key); + assert!(result.is_err()); + } + + #[test] + fn test_serialize_semantic_type() { + let semantic_type = SemanticType::Tag; + let expected = "\"Tag\"".to_string(); + assert_eq!( + MetadataRegion::serialize_semantic_type(semantic_type), + expected + ); + } + + #[test] + fn test_deserialize_semantic_type() { + let semantic_type = "\"Tag\""; + let expected = SemanticType::Tag; + assert_eq!( + MetadataRegion::deserialize_semantic_type(semantic_type).unwrap(), + expected + ); + + let semantic_type = "\"InvalidType\""; + assert!(MetadataRegion::deserialize_semantic_type(semantic_type).is_err()); + } +} diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs new file mode 100644 index 0000000000..313a2ceba8 --- /dev/null +++ b/src/metric-engine/src/test_util.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing. + +use mito2::config::MitoConfig; +use mito2::engine::MitoEngine; +use mito2::test_util::TestEnv as MitoTestEnv; +use object_store::util::join_dir; + +/// Env to test metric engine. +pub struct TestEnv { + mito_env: MitoTestEnv, + mito: MitoEngine, +} + +impl TestEnv { + /// Returns a new env with empty prefix for test. + pub async fn new() -> Self { + Self::with_prefix("").await + } + + /// Returns a new env with specific `prefix` for test. + pub async fn with_prefix(prefix: &str) -> Self { + let mut mito_env = MitoTestEnv::with_prefix(prefix); + let mito = mito_env.create_engine(MitoConfig::default()).await; + Self { mito_env, mito } + } + + pub fn data_home(&self) -> String { + let env_root = self.mito_env.data_home().to_string_lossy().to_string(); + join_dir(&env_root, "data") + } + + /// Returns a reference to the engine. + pub fn mito(&self) -> MitoEngine { + self.mito.clone() + } +} diff --git a/src/metric-engine/src/utils.rs b/src/metric-engine/src/utils.rs new file mode 100644 index 0000000000..a2c9719a1f --- /dev/null +++ b/src/metric-engine/src/utils.rs @@ -0,0 +1,59 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::storage::RegionId; + +use crate::engine::{METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP}; + +/// Change the given [RegionId]'s region group to [METRIC_METADATA_REGION_GROUP]. +pub fn to_metadata_region_id(region_id: RegionId) -> RegionId { + let table_id = region_id.table_id(); + let region_sequence = region_id.region_sequence(); + RegionId::with_group_and_seq(table_id, METRIC_METADATA_REGION_GROUP, region_sequence) +} + +/// Change the given [RegionId]'s region group to [METRIC_DATA_REGION_GROUP]. +pub fn to_data_region_id(region_id: RegionId) -> RegionId { + let table_id = region_id.table_id(); + let region_sequence = region_id.region_sequence(); + RegionId::with_group_and_seq(table_id, METRIC_DATA_REGION_GROUP, region_sequence) +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_to_metadata_region_id() { + let region_id = RegionId::new(1, 2); + let expected_region_id = RegionId::with_group_and_seq(1, METRIC_METADATA_REGION_GROUP, 2); + assert_eq!(to_metadata_region_id(region_id), expected_region_id); + + let region_id = RegionId::with_group_and_seq(1, 243, 2); + let expected_region_id = RegionId::with_group_and_seq(1, METRIC_METADATA_REGION_GROUP, 2); + assert_eq!(to_metadata_region_id(region_id), expected_region_id); + } + + #[test] + fn test_to_data_region_id() { + let region_id = RegionId::new(1, 2); + let expected_region_id = RegionId::with_group_and_seq(1, METRIC_DATA_REGION_GROUP, 2); + assert_eq!(to_data_region_id(region_id), expected_region_id); + + let region_id = RegionId::with_group_and_seq(1, 243, 2); + let expected_region_id = RegionId::with_group_and_seq(1, METRIC_DATA_REGION_GROUP, 2); + assert_eq!(to_data_region_id(region_id), expected_region_id); + } +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index ff48b5818b..4703d3f6dc 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] default = [] -test = ["common-test-util"] +test = ["common-test-util", "log-store"] [dependencies] anymap = "1.0.0-beta.2" @@ -39,6 +39,7 @@ datatypes = { workspace = true } futures.workspace = true humantime-serde = { workspace = true } lazy_static = "1.4" +log-store = { workspace = true, optional = true } memcomparable = "0.2" moka = { workspace = true, features = ["sync"] } object-store = { workspace = true } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index daaff6626c..3115cb180d 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -28,8 +28,8 @@ mod create_test; mod drop_test; #[cfg(test)] mod flush_test; -#[cfg(test)] -pub(crate) mod listener; +#[cfg(any(test, feature = "test"))] +pub mod listener; #[cfg(test)] mod open_test; #[cfg(test)] @@ -61,6 +61,8 @@ use crate::region::RegionUsage; use crate::request::WorkerRequest; use crate::worker::WorkerGroup; +pub const MITO_ENGINE_NAME: &str = "mito"; + /// Region engine implementation for timeseries data. #[derive(Clone)] pub struct MitoEngine { @@ -200,7 +202,7 @@ impl EngineInner { #[async_trait] impl RegionEngine for MitoEngine { fn name(&self) -> &str { - "mito" + MITO_ENGINE_NAME } async fn handle_request( @@ -265,7 +267,7 @@ impl RegionEngine for MitoEngine { } // Tests methods. -#[cfg(test)] +#[cfg(any(test, feature = "test"))] impl MitoEngine { /// Returns a new [MitoEngine] for tests. pub fn new_for_test( diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 63876149e8..efd8a727d2 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -72,7 +72,7 @@ async fn test_manual_flush() { async fn test_flush_engine() { let mut env = TestEnv::new(); let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); - let listener = Arc::new(FlushListener::new()); + let listener = Arc::new(FlushListener::default()); let engine = env .create_engine_with( MitoConfig::default(), @@ -131,7 +131,7 @@ async fn test_flush_engine() { async fn test_write_stall() { let mut env = TestEnv::new(); let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); - let listener = Arc::new(StallListener::new()); + let listener = Arc::new(StallListener::default()); let engine = env .create_engine_with( MitoConfig::default(), diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index f0b5def366..0629b0e92d 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -51,18 +51,12 @@ pub trait EventListener: Send + Sync { pub type EventListenerRef = Arc; /// Listener to watch flush events. +#[derive(Default)] pub struct FlushListener { notify: Notify, } impl FlushListener { - /// Creates a new listener. - pub fn new() -> FlushListener { - FlushListener { - notify: Notify::new(), - } - } - /// Wait until one flush job is done. pub async fn wait(&self) { self.notify.notified().await; @@ -83,18 +77,12 @@ impl EventListener for FlushListener { } /// Listener to watch stall events. +#[derive(Default)] pub struct StallListener { notify: Notify, } impl StallListener { - /// Creates a new listener. - pub fn new() -> StallListener { - StallListener { - notify: Notify::new(), - } - } - /// Wait for a stall event. pub async fn wait(&self) { self.notify.notified().await; @@ -120,6 +108,7 @@ impl EventListener for StallListener { /// to block and wait for `on_flush_region()`. /// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate /// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing. +#[derive(Default)] pub struct FlushTruncateListener { /// Notify flush operation. notify_flush: Notify, @@ -128,14 +117,6 @@ pub struct FlushTruncateListener { } impl FlushTruncateListener { - /// Creates a new listener. - pub fn new() -> FlushTruncateListener { - FlushTruncateListener { - notify_flush: Notify::new(), - notify_truncate: Notify::new(), - } - } - /// Notify flush region to proceed. pub fn notify_flush(&self) { self.notify_flush.notify_one(); diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 5903eb42c0..8bf2ddb7e7 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -271,7 +271,7 @@ async fn test_engine_truncate_during_flush() { init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-during-flush"); let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); - let listener = Arc::new(FlushTruncateListener::new()); + let listener = Arc::new(FlushTruncateListener::default()); let engine = env .create_engine_with( MitoConfig::default(), diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index dd404f2013..1a32351547 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -19,6 +19,7 @@ #![feature(let_chains)] #[cfg(any(test, feature = "test"))] +#[cfg_attr(feature = "test", allow(unused))] pub mod test_util; mod access_layer; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index b87fd12039..7d49bb2348 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -20,6 +20,7 @@ pub mod scheduler_util; pub mod version_util; use std::collections::HashMap; +use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -119,6 +120,10 @@ impl TestEnv { .map(|manager| manager.default_object_store().clone()) } + pub fn data_home(&self) -> &Path { + self.data_home.path() + } + /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index b6a906a41e..bc2463f224 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -198,7 +198,7 @@ impl WorkerGroup { } // Tests methods. -#[cfg(test)] +#[cfg(any(test, feature = "test"))] impl WorkerGroup { /// Starts a worker group with `write_buffer_manager` and `listener` for tests. /// @@ -583,12 +583,12 @@ impl RegionWorkerLoop { /// Wrapper that only calls event listener in tests. #[derive(Default, Clone)] pub(crate) struct WorkerListener { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] listener: Option, } impl WorkerListener { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] pub(crate) fn new( listener: Option, ) -> WorkerListener { @@ -597,7 +597,7 @@ impl WorkerListener { /// Flush is finished successfully. pub(crate) fn on_flush_success(&self, region_id: RegionId) { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { listener.on_flush_success(region_id); } @@ -607,14 +607,14 @@ impl WorkerListener { /// Engine is stalled. pub(crate) fn on_write_stall(&self) { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { listener.on_write_stall(); } } pub(crate) async fn on_flush_begin(&self, region_id: RegionId) { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { listener.on_flush_begin(region_id).await; } @@ -623,7 +623,7 @@ impl WorkerListener { } pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { return listener.on_later_drop_begin(region_id); } @@ -634,7 +634,7 @@ impl WorkerListener { /// On later drop task is finished. pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) { - #[cfg(test)] + #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { listener.on_later_drop_end(region_id, removed); } diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index de0ce1739f..f0f4ef0f97 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -88,7 +88,7 @@ impl RegionId { RegionId(id) } - #[cfg(test)] + /// Construct a new [RegionId] from table id, region group and region sequence. pub const fn with_group_and_seq( table_id: TableId, group: RegionGroup,