diff --git a/Cargo.lock b/Cargo.lock index 89639ff117..cf2c394263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4765,6 +4765,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "datafusion", "datatypes", "mito2", "object-store", diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 9730b4041f..0e34dc4cff 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -43,7 +43,7 @@ pub trait RecordBatchStream: Stream> { pub type SendableRecordBatchStream = Pin>; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct OrderOption { pub name: String, pub options: SortOptions, diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 46c76fd79b..65b1bdd786 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -14,6 +14,7 @@ common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true +datafusion.workspace = true datatypes.workspace = true mito2.workspace = true object-store.workspace = true diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 0262c32558..86c41aeced 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -26,13 +26,14 @@ use datatypes::schema::ColumnSchema; use datatypes::value::Value; use mito2::engine::{MitoEngine, MITO_ENGINE_NAME}; use object_store::util::join_dir; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{RegionCreateRequest, RegionRequest}; +use store_api::storage::consts::ReservedColumnId; use store_api::storage::{RegionGroup, RegionId, ScanRequest}; -use crate::error::{CreateMitoRegionSnafu, Result}; +use crate::error::{CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, Result}; use crate::utils; /// region group value for data region inside a metric region @@ -41,12 +42,20 @@ pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0; /// region group value for metadata region inside a metric region pub const METRIC_METADATA_REGION_GROUP: RegionGroup = 1; -const METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME: &str = "ts"; -const METADATA_SCHEMA_KEY_COLUMN_NAME: &str = "k"; -const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "val"; +pub const METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME: &str = "ts"; +pub const METADATA_SCHEMA_KEY_COLUMN_NAME: &str = "k"; +pub const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "v"; -const METADATA_REGION_SUBDIR: &str = "metadata"; -const DATA_REGION_SUBDIR: &str = "data"; +pub const METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX: usize = 0; +pub const METADATA_SCHEMA_KEY_COLUMN_INDEX: usize = 1; +pub const METADATA_SCHEMA_VALUE_COLUMN_INDEX: usize = 2; + +/// Column name of internal column `__metric` that stores the original metric name +pub const DATA_SCHEMA_METRIC_NAME_COLUMN_NAME: &str = "__metric"; +pub const DATA_SCHEMA_TSID_COLUMN_NAME: &str = "__tsid"; + +pub const METADATA_REGION_SUBDIR: &str = "metadata"; +pub const DATA_REGION_SUBDIR: &str = "data"; pub const METRIC_ENGINE_NAME: &str = "metric"; @@ -129,32 +138,32 @@ impl RegionEngine for MetricEngine { } } +impl MetricEngine { + pub fn new(mito: MitoEngine) -> Self { + Self { + inner: Arc::new(MetricEngineInner { mito }), + } + } +} + struct MetricEngineInner { mito: MitoEngine, } impl MetricEngineInner { + /// Initialize a metric region at given region id. pub async fn create_region( &self, region_id: RegionId, request: RegionCreateRequest, ) -> Result<()> { - self.verify_region_create_request(&request)?; + Self::verify_region_create_request(&request)?; let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id); - let create_data_region_request = self.create_request_for_data_region(&request); + + // create metadata region let create_metadata_region_request = self.create_request_for_metadata_region(&request.region_dir); - - // self.mito - // .handle_request( - // data_region_id, - // RegionRequest::Create(create_data_region_request), - // ) - // .await - // .with_context(|_| CreateMitoRegionSnafu { - // region_type: DATA_REGION_SUBDIR, - // })?; self.mito .handle_request( metadata_region_id, @@ -165,12 +174,24 @@ impl MetricEngineInner { region_type: METADATA_REGION_SUBDIR, })?; + // create data region + let create_data_region_request = self.create_request_for_data_region(&request); + self.mito + .handle_request( + data_region_id, + RegionRequest::Create(create_data_region_request), + ) + .await + .with_context(|_| CreateMitoRegionSnafu { + region_type: DATA_REGION_SUBDIR, + })?; + Ok(()) } /// Check if - /// - internal columns are present - fn verify_region_create_request(&self, request: &RegionCreateRequest) -> Result<()> { + /// - internal columns are not occupied + fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> { let name_to_index = request .column_metadatas .iter() @@ -178,6 +199,20 @@ impl MetricEngineInner { .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx)) .collect::>(); + // check if internal columns are not occupied + ensure!( + !name_to_index.contains_key(DATA_SCHEMA_METRIC_NAME_COLUMN_NAME), + InternalColumnOccupiedSnafu { + column: DATA_SCHEMA_METRIC_NAME_COLUMN_NAME, + } + ); + ensure!( + !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME), + InternalColumnOccupiedSnafu { + column: DATA_SCHEMA_TSID_COLUMN_NAME, + } + ); + Ok(()) } @@ -197,7 +232,7 @@ impl MetricEngineInner { pub fn create_request_for_metadata_region(&self, region_dir: &str) -> RegionCreateRequest { // ts TIME INDEX DEFAULT 0 let timestamp_column_metadata = ColumnMetadata { - column_id: 0, + column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _, semantic_type: SemanticType::Timestamp, column_schema: ColumnSchema::new( METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, @@ -211,7 +246,7 @@ impl MetricEngineInner { }; // key STRING PRIMARY KEY let key_column_metadata = ColumnMetadata { - column_id: 1, + column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _, semantic_type: SemanticType::Tag, column_schema: ColumnSchema::new( METADATA_SCHEMA_KEY_COLUMN_NAME, @@ -221,7 +256,7 @@ impl MetricEngineInner { }; // val STRING let value_column_metadata = ColumnMetadata { - column_id: 2, + column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _, semantic_type: SemanticType::Field, column_schema: ColumnSchema::new( METADATA_SCHEMA_VALUE_COLUMN_NAME, @@ -230,6 +265,7 @@ impl MetricEngineInner { ), }; + // concat region dir let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR); RegionCreateRequest { @@ -239,13 +275,18 @@ impl MetricEngineInner { key_column_metadata, value_column_metadata, ], - primary_key: vec![1], + primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _], options: HashMap::new(), region_dir: metadata_region_dir, } } - // todo: register "tag columns" to metadata + /// Convert [RegionCreateRequest] for data region. + /// + /// All tag columns in the original request will be converted to value columns. + /// Those columns real semantic type is stored in metadata region. + /// + /// This will also add internal columns to the request. pub fn create_request_for_data_region( &self, request: &RegionCreateRequest, @@ -255,57 +296,157 @@ impl MetricEngineInner { // concat region dir data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR); - // todo: change semantic type and primary key + // convert semantic type + data_region_request + .column_metadatas + .iter_mut() + .for_each(|metadata| { + if metadata.semantic_type == SemanticType::Tag { + metadata.semantic_type = SemanticType::Field; + } + }); - // todo: add internal column + // add internal columns + let metric_name_col = ColumnMetadata { + column_id: ReservedColumnId::metric_name(), + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + DATA_SCHEMA_METRIC_NAME_COLUMN_NAME, + ConcreteDataType::string_datatype(), + false, + ), + }; + let tsid_col = ColumnMetadata { + column_id: ReservedColumnId::tsid(), + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + ConcreteDataType::int64_datatype(), + false, + ), + }; + data_region_request.column_metadatas.push(metric_name_col); + data_region_request.column_metadatas.push(tsid_col); + data_region_request.primary_key = + vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()]; data_region_request } } #[cfg(test)] -mod test { - use std::time::Duration; - - use common_telemetry::info; - +mod tests { use super::*; use crate::test_util::TestEnv; - #[tokio::test] - async fn create_metadata_region() { - common_telemetry::init_default_ut_logging(); - - let env = TestEnv::new().await; - let mito = env.mito(); - let engine = MetricEngine { - inner: Arc::new(MetricEngineInner { mito }), - }; - let engine_dir = env.data_home(); - let region_dir = join_dir(&engine_dir, "test_metric_region"); - - let region_id = RegionId::new(1, 2); - let region_create_request = RegionCreateRequest { + #[test] + fn test_verify_region_create_request() { + // internal column is occupied + let request = RegionCreateRequest { + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + DATA_SCHEMA_METRIC_NAME_COLUMN_NAME, + ConcreteDataType::string_datatype(), + false, + ), + }, + ], + region_dir: "test_dir".to_string(), engine: METRIC_ENGINE_NAME.to_string(), - column_metadatas: vec![], primary_key: vec![], options: HashMap::new(), - region_dir: "test_metric_region".to_string(), + }; + let result = MetricEngineInner::verify_region_create_request(&request); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Internal column __metric is reserved".to_string() + ); + + // valid request + let request = RegionCreateRequest { + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "column1".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + }, + ], + region_dir: "test_dir".to_string(), + engine: METRIC_ENGINE_NAME.to_string(), + primary_key: vec![], + options: HashMap::new(), + }; + let result = MetricEngineInner::verify_region_create_request(&request); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_create_request_for_data_region() { + let request = RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + "timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag", + ConcreteDataType::string_datatype(), + false, + ), + }, + ], + primary_key: vec![0], + options: HashMap::new(), + region_dir: "test_dir".to_string(), }; - // create the region - engine - .handle_request(region_id, RegionRequest::Create(region_create_request)) - .await - .unwrap(); + let env = TestEnv::new().await; + let engine = MetricEngineInner { mito: env.mito() }; + let data_region_request = engine.create_request_for_data_region(&request); - // assert metadata region's dir - let metadata_region_dir = join_dir(®ion_dir, METADATA_REGION_SUBDIR); - let exist = tokio::fs::try_exists(region_dir).await.unwrap(); - assert!(exist); - - // check mito engine - let metadata_region_id = utils::to_metadata_region_id(region_id); - let result = env.mito().get_metadata(metadata_region_id).await.unwrap(); + assert_eq!( + data_region_request.region_dir, + "/test_dir/data/".to_string() + ); + assert_eq!(data_region_request.column_metadatas.len(), 4); + assert_eq!( + data_region_request.primary_key, + vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()] + ); } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 7bbb38afac..6ec8c95b53 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -53,6 +53,27 @@ pub enum Error { error: base64::DecodeError, location: Location, }, + + #[snafu(display("Mito read operation fails"))] + MitoReadOperation { + source: BoxedError, + location: Location, + }, + + #[snafu(display("Mito write operation fails"))] + MitoWriteOperation { + source: BoxedError, + location: Location, + }, + + #[snafu(display("Failed to collect record batch stream"))] + CollectRecordBatchStream { + source: common_recordbatch::error::Error, + location: Location, + }, + + #[snafu(display("Internal column {} is reserved", column))] + InternalColumnOccupied { column: String, location: Location }, } pub type Result = std::result::Result; @@ -62,11 +83,17 @@ impl ErrorExt for Error { use Error::*; match self { + InternalColumnOccupied { .. } => StatusCode::InvalidArguments, + MissingInternalColumn { .. } | DeserializeSemanticType { .. } | DecodeColumnValue { .. } => StatusCode::Unexpected, - CreateMitoRegion { source, .. } => source.status_code(), + CreateMitoRegion { source, .. } + | MitoReadOperation { source, .. } + | MitoWriteOperation { source, .. } => source.status_code(), + + CollectRecordBatchStream { source, .. } => source.status_code(), TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 09bdc7cd19..0493553ed8 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -12,15 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::SemanticType; +use api::v1::value::ValueData; +use api::v1::{self, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use base64::engine::general_purpose::STANDARD_NO_PAD; use base64::Engine; +use common_recordbatch::util::collect; +use datafusion::prelude::{col, lit, Expr}; +use datatypes::vectors::StringVector; use mito2::engine::MitoEngine; use snafu::ResultExt; -use store_api::storage::RegionId; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{RegionPutRequest, RegionReadRequest}; +use store_api::storage::{RegionId, ScanRequest}; +use crate::engine::{ + METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, + METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, +}; use crate::error::{ - DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, Result, TableAlreadyExistsSnafu, + CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, + MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, TableAlreadyExistsSnafu, }; use crate::utils; @@ -41,15 +52,21 @@ pub struct MetadataRegion { } impl MetadataRegion { + pub fn new(mito: MitoEngine) -> Self { + Self { mito } + } + /// Add a new table key to metadata. /// /// This method will check if the table key already exists, if so, it will return /// a [TableAlreadyExistsSnafu] error. - pub fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> { + pub async fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> { let region_id = utils::to_metadata_region_id(region_id); let table_key = Self::concat_table_key(table_name); - let put_success = self.put_conditionally(region_id, table_key, String::new())?; + let put_success = self + .put_conditionally(region_id, table_key, String::new()) + .await?; if !put_success { TableAlreadyExistsSnafu { table_name }.fail() @@ -60,14 +77,15 @@ impl MetadataRegion { /// Add a new column key to metadata. /// - /// This method won't check if the column already exists. - pub fn add_column( + /// This method won't check if the column already exists. But + /// will return if the column is successfully added. + pub async fn add_column( &self, region_id: RegionId, table_name: &str, column_name: &str, semantic_type: SemanticType, - ) -> Result<()> { + ) -> Result { let region_id = utils::to_metadata_region_id(region_id); let column_key = Self::concat_column_key(table_name, column_name); @@ -75,8 +93,30 @@ impl MetadataRegion { region_id, column_key, Self::serialize_semantic_type(semantic_type), - )?; - Ok(()) + ) + .await + } + + /// Check if the given table exists. + pub async fn is_table_exist(&self, region_id: RegionId, table_name: &str) -> Result { + let region_id = utils::to_metadata_region_id(region_id); + let table_key = Self::concat_table_key(table_name); + self.exist(region_id, &table_key).await + } + + /// Check if the given column exists. Return the semantic type if exists. + pub async fn column_semantic_type( + &self, + region_id: RegionId, + table_name: &str, + column_name: &str, + ) -> Result> { + let region_id = utils::to_metadata_region_id(region_id); + let column_key = Self::concat_column_key(table_name, column_name); + let semantic_type = self.get(region_id, &column_key).await?; + semantic_type + .map(|s| Self::deserialize_semantic_type(&s)) + .transpose() } } @@ -136,24 +176,134 @@ impl MetadataRegion { impl MetadataRegion { /// Put if not exist, return if this put operation is successful (error other /// than "key already exist" will be wrapped in [Err]). - pub fn put_conditionally( + pub async fn put_conditionally( &self, region_id: RegionId, key: String, value: String, ) -> Result { - todo!() + if self.exist(region_id, &key).await? { + return Ok(false); + } + + let put_request = Self::build_put_request(&key, &value); + self.mito + .handle_request( + region_id, + store_api::region_request::RegionRequest::Put(put_request), + ) + .await + .context(MitoWriteOperationSnafu)?; + Ok(true) } /// Check if the given key exists. - pub fn exist(&self, region_id: RegionId, key: &str) -> Result { - todo!() + /// + /// Notice that due to mito doesn't support transaction, TOCTTOU is possible. + pub async fn exist(&self, region_id: RegionId, key: &str) -> Result { + let scan_req = Self::build_read_request(key); + let record_batch_stream = self + .mito + .handle_query(region_id, scan_req) + .await + .context(MitoReadOperationSnafu)?; + let scan_result = collect(record_batch_stream) + .await + .context(CollectRecordBatchStreamSnafu)?; + + let exist = !scan_result.is_empty() && scan_result.first().unwrap().num_rows() != 0; + Ok(exist) + } + + /// Retrieves the value associated with the given key in the specified region. + /// Returns `Ok(None)` if the key is not found. + pub async fn get(&self, region_id: RegionId, key: &str) -> Result> { + let scan_req = Self::build_read_request(key); + let record_batch_stream = self + .mito + .handle_query(region_id, scan_req) + .await + .context(MitoReadOperationSnafu)?; + let mut scan_result = collect(record_batch_stream) + .await + .context(CollectRecordBatchStreamSnafu)?; + + let Some(first_batch) = scan_result.first() else { + return Ok(None); + }; + + let val = first_batch + .column(0) + .get_ref(0) + .as_string() + .unwrap() + .map(|s| s.to_string()); + + Ok(val) + } + + /// Builds a [ScanRequest] to read metadata for a given key. + /// The request will contains a EQ filter on the key column. + /// + /// Only the value column is projected. + fn build_read_request(key: &str) -> ScanRequest { + let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key)); + + ScanRequest { + sequence: None, + projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), + filters: vec![filter_expr.into()], + output_ordering: None, + limit: None, + } + } + + fn build_put_request(key: &str, value: &str) -> RegionPutRequest { + let cols = vec![ + ColumnSchema { + column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + semantic_type: SemanticType::Timestamp as _, + }, + ColumnSchema { + column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Tag as _, + }, + ColumnSchema { + column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Field as _, + }, + ]; + let rows = Rows { + schema: cols, + rows: vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + Value { + value_data: Some(ValueData::StringValue(value.to_string())), + }, + ], + }], + }; + + RegionPutRequest { rows } } } #[cfg(test)] mod test { + use store_api::region_request::RegionRequest; + use super::*; + use crate::test_util::TestEnv; + use crate::utils::to_metadata_region_id; #[test] fn test_concat_table_key() { @@ -226,4 +376,163 @@ mod test { let semantic_type = "\"InvalidType\""; assert!(MetadataRegion::deserialize_semantic_type(semantic_type).is_err()); } + + #[test] + fn test_build_read_request() { + let key = "test_key"; + let expected_filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key)); + let expected_scan_request = ScanRequest { + sequence: None, + projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]), + filters: vec![expected_filter_expr.into()], + output_ordering: None, + limit: None, + }; + let actual_scan_request = MetadataRegion::build_read_request(key); + assert_eq!(actual_scan_request, expected_scan_request); + } + + #[tokio::test] + async fn test_put_conditionally() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let metadata_region = env.metadata_region(); + let region_id = to_metadata_region_id(env.default_region_id()); + + // Test inserting a new key-value pair + let key = "test_key".to_string(); + let value = "test_value".to_string(); + let result = metadata_region + .put_conditionally(region_id, key.clone(), value.clone()) + .await; + assert!(result.is_ok()); + assert!(result.unwrap()); + + // Verify that the key-value pair was actually inserted + let scan_req = MetadataRegion::build_read_request("test_key"); + let record_batch_stream = metadata_region + .mito + .handle_query(region_id, scan_req) + .await + .unwrap(); + let scan_result = collect(record_batch_stream).await.unwrap(); + assert_eq!(scan_result.len(), 1); + + // Test inserting the same key-value pair again + let result = metadata_region + .put_conditionally(region_id, key.clone(), value.clone()) + .await; + assert!(result.is_ok()); + assert!(!result.unwrap(),); + } + + #[tokio::test] + async fn test_exist() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let metadata_region = env.metadata_region(); + let region_id = to_metadata_region_id(env.default_region_id()); + + // Test checking for a non-existent key + let key = "test_key".to_string(); + let result = metadata_region.exist(region_id, &key).await; + assert!(result.is_ok()); + assert!(!result.unwrap()); + + // Test inserting a key and then checking for its existence + let value = "test_value".to_string(); + let put_request = MetadataRegion::build_put_request(&key, &value); + metadata_region + .mito + .handle_request(region_id, RegionRequest::Put(put_request)) + .await + .unwrap(); + let result = metadata_region.exist(region_id, &key).await; + assert!(result.is_ok()); + assert!(result.unwrap(),); + } + + #[tokio::test] + async fn test_get() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let metadata_region = env.metadata_region(); + let region_id = to_metadata_region_id(env.default_region_id()); + + // Test getting a non-existent key + let key = "test_key".to_string(); + let result = metadata_region.get(region_id, &key).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), None); + + // Test inserting a key and then getting its value + let value = "test_value".to_string(); + let put_request = MetadataRegion::build_put_request(&key, &value); + metadata_region + .mito + .handle_request(region_id, RegionRequest::Put(put_request)) + .await + .unwrap(); + let result = metadata_region.get(region_id, &key).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(value)); + } + + #[tokio::test] + async fn test_add_table() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let metadata_region = env.metadata_region(); + let region_id = to_metadata_region_id(env.default_region_id()); + + // add one table + let table_name = "table1"; + metadata_region + .add_table(region_id, table_name) + .await + .unwrap(); + assert!(metadata_region + .is_table_exist(region_id, table_name) + .await + .unwrap()); + + // add it again + assert!(metadata_region + .add_table(region_id, table_name) + .await + .is_err()); + } + + #[tokio::test] + async fn test_add_column() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let metadata_region = env.metadata_region(); + let region_id = to_metadata_region_id(env.default_region_id()); + + let table_name = "table1"; + let column_name = "column1"; + let semantic_type = SemanticType::Tag; + metadata_region + .add_column(region_id, table_name, column_name, semantic_type) + .await + .unwrap(); + let actual_semantic_type = metadata_region + .column_semantic_type(region_id, table_name, column_name) + .await + .unwrap(); + assert_eq!(actual_semantic_type, Some(semantic_type)); + + // duplicate column won't be updated + let is_updated = metadata_region + .add_column(region_id, table_name, column_name, SemanticType::Field) + .await + .unwrap(); + assert!(!is_updated); + let actual_semantic_type = metadata_region + .column_semantic_type(region_id, table_name, column_name) + .await + .unwrap(); + assert_eq!(actual_semantic_type, Some(semantic_type)); + } } diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 313a2ceba8..7426aadb49 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -14,10 +14,22 @@ //! Utilities for testing. +use std::collections::HashMap; + +use api::v1::SemanticType; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use mito2::test_util::TestEnv as MitoTestEnv; use object_store::util::join_dir; +use store_api::metadata::ColumnMetadata; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{RegionCreateRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::engine::{MetricEngine, METRIC_ENGINE_NAME}; +use crate::metadata_region::MetadataRegion; /// Env to test metric engine. pub struct TestEnv { @@ -47,4 +59,78 @@ impl TestEnv { pub fn mito(&self) -> MitoEngine { self.mito.clone() } + + pub fn metric(&self) -> MetricEngine { + MetricEngine::new(self.mito()) + } + + /// Create regions in [MetricEngine] under [`default_region_id`](TestEnv::default_region_id) + /// and region dir `"test_metric_region"`. + pub async fn init_metric_region(&self) { + let region_id = self.default_region_id(); + let region_create_request = RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas: vec![ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }], + primary_key: vec![], + options: HashMap::new(), + region_dir: "test_metric_region".to_string(), + }; + + // create regions + self.metric() + .handle_request(region_id, RegionRequest::Create(region_create_request)) + .await + .unwrap(); + } + + pub fn metadata_region(&self) -> MetadataRegion { + MetadataRegion::new(self.mito()) + } + + /// `RegionId::new(1, 2)` + pub fn default_region_id(&self) -> RegionId { + RegionId::new(1, 2) + } +} + +#[cfg(test)] +mod test { + + use super::*; + use crate::engine::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; + use crate::utils::{self, to_metadata_region_id}; + + #[tokio::test] + async fn create_metadata_region() { + common_telemetry::init_default_ut_logging(); + + let env = TestEnv::new().await; + env.init_metric_region().await; + let region_id = to_metadata_region_id(env.default_region_id()); + let region_dir = join_dir(&env.data_home(), "test_metric_region"); + + // assert metadata region's dir + let metadata_region_dir = join_dir(®ion_dir, METADATA_REGION_SUBDIR); + let exist = tokio::fs::try_exists(metadata_region_dir).await.unwrap(); + assert!(exist); + + // assert data region's dir + let data_region_dir = join_dir(®ion_dir, DATA_REGION_SUBDIR); + let exist = tokio::fs::try_exists(data_region_dir).await.unwrap(); + assert!(exist); + + // check mito engine + let metadata_region_id = utils::to_metadata_region_id(region_id); + let _ = env.mito().get_metadata(metadata_region_id).await.unwrap(); + let data_region_id = utils::to_data_region_id(region_id); + let _ = env.mito().get_metadata(data_region_id).await.unwrap(); + } } diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 284d98177a..6743500353 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -37,6 +37,8 @@ enum ReservedColumnType { Version = 0, Sequence, OpType, + Tsid, + MetricName, } /// Column id reserved by the engine. @@ -66,6 +68,20 @@ impl ReservedColumnId { pub const fn op_type() -> ColumnId { Self::BASE | ReservedColumnType::OpType as ColumnId } + + /// Id for storing TSID column. + /// + /// Used by: metric engine + pub const fn tsid() -> ColumnId { + Self::BASE | ReservedColumnType::Tsid as ColumnId + } + + /// Id for storing metric name column. + /// + /// Used by: metric engine + pub const fn metric_name() -> ColumnId { + Self::BASE | ReservedColumnType::MetricName as ColumnId + } } // ----------------------------------------------------------------------------- diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 2597b76614..89e68687aa 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -46,7 +46,7 @@ pub trait WriteRequest: Send { fn delete(&mut self, keys: HashMap) -> Result<(), Self::Error>; } -#[derive(Default, Clone, Debug)] +#[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct ScanRequest { /// Max sequence number to read, None for latest sequence. ///