From bb97f1bf16a4b08c3390fb4d11caf8b409fc7be3 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 13 Feb 2025 16:39:04 +0900 Subject: [PATCH] perf: optimize table creation speed in metric engine (#5503) * feat(metric-engine): introduce batch create request handling * chore: remove unused code * test: add more tests * chore: remove unused error * chore: apply suggestions from CR --- src/metric-engine/src/engine.rs | 59 +++- src/metric-engine/src/engine/create.rs | 266 +++++++++--------- .../src/engine/create/add_columns.rs | 42 +++ .../src/engine/create/extract_new_columns.rs | 174 ++++++++++++ .../src/engine/create/validate.rs | 60 ++++ src/metric-engine/src/engine/state.rs | 28 ++ src/metric-engine/src/error.rs | 26 +- src/metric-engine/src/lib.rs | 1 + src/metric-engine/src/metadata_region.rs | 245 +++++++++++----- 9 files changed, 681 insertions(+), 220 deletions(-) create mode 100644 src/metric-engine/src/engine/create/add_columns.rs create mode 100644 src/metric-engine/src/engine/create/extract_new_columns.rs create mode 100644 src/metric-engine/src/engine/create/validate.rs diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index fa054e3f8b..8e69458c12 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -42,7 +42,7 @@ use store_api::region_engine::{ RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, }; -use store_api::region_request::RegionRequest; +use store_api::region_request::{BatchRegionDdlRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use self::state::MetricEngineState; @@ -127,6 +127,43 @@ impl RegionEngine for MetricEngine { METRIC_ENGINE_NAME } + async fn handle_batch_ddl_requests( + &self, + batch_request: BatchRegionDdlRequest, + ) -> Result { + match batch_request { + BatchRegionDdlRequest::Create(requests) => { + let mut extension_return_value = HashMap::new(); + let rows = self + .inner + .create_regions(requests, &mut extension_return_value) + .await + .map_err(BoxedError::new)?; + + Ok(RegionResponse { + affected_rows: rows, + extensions: extension_return_value, + }) + } + BatchRegionDdlRequest::Alter(requests) => { + self.handle_requests( + requests + .into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), + ) + .await + } + BatchRegionDdlRequest::Drop(requests) => { + self.handle_requests( + requests + .into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))), + ) + .await + } + } + } + /// Handles non-query request to the region. Returns the count of affected rows. async fn handle_request( &self, @@ -139,7 +176,7 @@ impl RegionEngine for MetricEngine { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Create(create) => { self.inner - .create_region(region_id, create, &mut extension_return_value) + .create_regions(vec![(region_id, create)], &mut extension_return_value) .await } RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await, @@ -301,6 +338,24 @@ impl MetricEngine { .await .map_err(BoxedError::new) } + + async fn handle_requests( + &self, + requests: impl IntoIterator, + ) -> Result { + let mut affected_rows = 0; + let mut extensions = HashMap::new(); + for (region_id, request) in requests { + let response = self.handle_request(region_id, request).await?; + affected_rows += response.affected_rows; + extensions.extend(response.extensions); + } + + Ok(RegionResponse { + affected_rows, + extensions, + }) + } } #[cfg(test)] diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 2e77a49e60..b76f22586e 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +mod add_columns; +mod extract_new_columns; +mod validate; +use std::collections::{HashMap, HashSet}; + +use add_columns::add_columns_to_physical_data_region; use api::v1::SemanticType; -use common_error::ext::BoxedError; use common_telemetry::{info, warn}; use common_time::{Timestamp, FOREVER}; use datatypes::data_type::ConcreteDataType; @@ -39,36 +43,51 @@ use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; +use validate::validate_create_logical_regions; +use crate::engine::create::extract_new_columns::extract_new_columns; use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions}; use crate::engine::MetricEngineInner; use crate::error::{ - AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, - ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, - InvalidMetadataSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu, - MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, - Result, SerializeColumnMetadataSnafu, + ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu, + EmptyRequestSnafu, InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu, + MultipleFieldColumnSnafu, NoFieldColumnSnafu, PhysicalRegionNotFoundSnafu, Result, + SerializeColumnMetadataSnafu, UnexpectedRequestSnafu, }; -use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; -use crate::utils::{to_data_region_id, to_metadata_region_id}; +use crate::metrics::{PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; +use crate::utils::{self, to_data_region_id, to_metadata_region_id}; impl MetricEngineInner { - /// Dispatch region creation request to physical region creation or logical - pub async fn create_region( + pub async fn create_regions( &self, - region_id: RegionId, - request: RegionCreateRequest, + mut requests: Vec<(RegionId, RegionCreateRequest)>, extension_return_value: &mut HashMap>, ) -> Result { - Self::verify_region_create_request(&request)?; + if requests.is_empty() { + return Ok(0); + } - let result = if request.is_physical_table() { - self.create_physical_region(region_id, request).await - } else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) { - let physical_region_id = self.create_logical_region(region_id, request).await?; + for (_, request) in requests.iter() { + Self::verify_region_create_request(request)?; + } - // Add physical table's column to extension map. - // It's ok to overwrite existing key, as the latter come schema is more up-to-date + let first_request = &requests.first().unwrap().1; + if first_request.is_physical_table() { + ensure!( + requests.len() == 1, + UnexpectedRequestSnafu { + reason: "Physical table must be created with single request".to_string(), + } + ); + let (region_id, request) = requests.pop().unwrap(); + self.create_physical_region(region_id, request).await?; + + return Ok(0); + } else if first_request + .options + .contains_key(LOGICAL_TABLE_METADATA_KEY) + { + let physical_region_id = self.create_logical_regions(requests).await?; let physical_columns = self .data_region .physical_columns(physical_region_id) @@ -78,13 +97,11 @@ impl MetricEngineInner { ColumnMetadata::encode_list(&physical_columns) .context(SerializeColumnMetadataSnafu)?, ); - - Ok(()) } else { - MissingRegionOptionSnafu {}.fail() - }; + return MissingRegionOptionSnafu {}.fail(); + } - result.map(|_| 0) + Ok(0) } /// Initialize a physical metric region at given region id. @@ -144,52 +161,33 @@ impl MetricEngineInner { Ok(()) } - /// Create a logical region. + /// Create multiple logical regions on the same physical region. /// - /// Physical table and logical table can have multiple regions, and their - /// region number should be the same. Thus we can infer the physical region - /// id by simply replace the table id part in the given region id, which - /// represent the "logical region" to request. - /// - /// This method will alter the data region to add columns if necessary. - /// - /// If the logical region to create already exists, this method will do nothing. - /// - /// `alter_request` is a hashmap that stores the alter requests that were executed - /// to the physical region. - /// - /// Return the physical region id of this logical region - async fn create_logical_region( + /// Returns the physical region id of the created logical regions. + async fn create_logical_regions( &self, - logical_region_id: RegionId, - request: RegionCreateRequest, + requests: Vec<(RegionId, RegionCreateRequest)>, ) -> Result { - // transform IDs - let physical_region_id_raw = request - .options - .get(LOGICAL_TABLE_METADATA_KEY) - .ok_or(MissingRegionOptionSnafu {}.build())?; - let physical_region_id: RegionId = physical_region_id_raw - .parse::() - .with_context(|_| ParseRegionIdSnafu { - raw: physical_region_id_raw, - })? - .into(); - let (data_region_id, metadata_region_id) = Self::transform_region_id(physical_region_id); + ensure!(!requests.is_empty(), EmptyRequestSnafu {}); - // check if the logical region already exist - if self - .metadata_region - .is_logical_region_exists(metadata_region_id, logical_region_id) - .await? - { - info!("Create a existing logical region {logical_region_id}. Skipped"); - return Ok(data_region_id); - } + let physical_region_id = validate_create_logical_regions(&requests)?; + let data_region_id = utils::to_data_region_id(physical_region_id); + + // Filters out the requests that the logical region already exists + let requests = { + let state = self.state.read().unwrap(); + let logical_region_exists = state.logical_region_exists_filter(data_region_id); + // TODO(weny): log the skipped logical regions + requests + .into_iter() + .filter(|(region_id, _)| !logical_region_exists(region_id)) + .collect::>() + }; + + // Finds new columns to add to physical region + let mut new_column_names = HashSet::new(); + let mut new_columns = Vec::new(); - // find new columns to add - let mut new_columns = vec![]; - let mut existing_columns = vec![]; let index_option = { let state = &self.state.read().unwrap(); let region_state = state @@ -200,81 +198,69 @@ impl MetricEngineInner { })?; let physical_columns = region_state.physical_columns(); - for col in &request.column_metadatas { - if !physical_columns.contains_key(&col.column_schema.name) { - // Multi-field on physical table is explicit forbidden at present - // TODO(ruihang): support multi-field on both logical and physical column - ensure!( - col.semantic_type != SemanticType::Field, - AddingFieldColumnSnafu { - name: col.column_schema.name.clone() - } - ); - new_columns.push(col.clone()); - } else { - existing_columns.push(col.column_schema.name.clone()); - } - } - + extract_new_columns( + &requests, + physical_columns, + &mut new_column_names, + &mut new_columns, + )?; region_state.options().index }; - if !new_columns.is_empty() { - info!("Found new columns {new_columns:?} to add to physical region {data_region_id}"); + // TODO(weny): we dont need to pass a mutable new_columns here. + add_columns_to_physical_data_region( + data_region_id, + index_option, + &mut new_columns, + &self.data_region, + ) + .await?; - self.add_columns_to_physical_data_region( - data_region_id, - logical_region_id, - &mut new_columns, - index_option, - ) - .await?; - - // register columns to metadata region - for col in &new_columns { - self.metadata_region - .add_column(metadata_region_id, logical_region_id, col) - .await?; - } - } - - // register logical region to metadata region - self.metadata_region - .add_logical_region(metadata_region_id, logical_region_id) - .await?; - - // register existing physical column to this new logical region. - let physical_schema = self - .data_region - .physical_columns(data_region_id) - .await - .map_err(BoxedError::new) - .context(MitoReadOperationSnafu)?; - let physical_schema_map = physical_schema - .into_iter() - .map(|metadata| (metadata.column_schema.name.clone(), metadata)) + let physical_columns = self.data_region.physical_columns(data_region_id).await?; + let physical_schema_map = physical_columns + .iter() + .map(|metadata| (metadata.column_schema.name.as_str(), metadata)) .collect::>(); - for col in &existing_columns { - let column_metadata = physical_schema_map - .get(col) - .with_context(|| ColumnNotFoundSnafu { - name: col, - region_id: physical_region_id, - })? - .clone(); - self.metadata_region - .add_column(metadata_region_id, logical_region_id, &column_metadata) - .await?; - } + let logical_regions = requests + .iter() + .map(|(region_id, _)| (*region_id)) + .collect::>(); + let logical_regions_column_names = requests.iter().map(|(region_id, request)| { + ( + *region_id, + request + .column_metadatas + .iter() + .map(|metadata| { + // Safety: previous steps ensure the physical region exist + let column_metadata = *physical_schema_map + .get(metadata.column_schema.name.as_str()) + .unwrap(); + (metadata.column_schema.name.as_str(), column_metadata) + }) + .collect::>(), + ) + }); - // update the mapping - // Safety: previous steps ensure the physical region exist - self.state - .write() - .unwrap() - .add_logical_region(physical_region_id, logical_region_id); - info!("Created new logical region {logical_region_id} on physical region {data_region_id}"); - LOGICAL_REGION_COUNT.inc(); + let new_add_columns = new_columns.iter().map(|metadata| { + // Safety: previous steps ensure the physical region exist + let column_metadata = *physical_schema_map + .get(metadata.column_schema.name.as_str()) + .unwrap(); + ( + metadata.column_schema.name.to_string(), + column_metadata.column_id, + ) + }); + + // Writes logical regions metadata to metadata region + self.metadata_region + .add_logical_regions(physical_region_id, logical_regions_column_names) + .await?; + + let mut state = self.state.write().unwrap(); + state.add_physical_columns(data_region_id, new_add_columns); + state.add_logical_regions(physical_region_id, logical_regions); Ok(data_region_id) } @@ -293,16 +279,16 @@ impl MetricEngineInner { new_columns: &mut [ColumnMetadata], index_options: IndexOptions, ) -> Result<()> { - // alter data region - self.data_region - .add_columns(data_region_id, new_columns, index_options) - .await?; - // Return early if no new columns are added. if new_columns.is_empty() { return Ok(()); } + // alter data region + self.data_region + .add_columns(data_region_id, new_columns, index_options) + .await?; + // correct the column id let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?; let after_alter_physical_schema_map = after_alter_physical_schema diff --git a/src/metric-engine/src/engine/create/add_columns.rs b/src/metric-engine/src/engine/create/add_columns.rs new file mode 100644 index 0000000000..78c66ac9ee --- /dev/null +++ b/src/metric-engine/src/engine/create/add_columns.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::metadata::ColumnMetadata; +use store_api::storage::RegionId; + +use crate::data_region::DataRegion; +use crate::engine::IndexOptions; +use crate::error::Result; +use crate::metrics::PHYSICAL_COLUMN_COUNT; + +/// Add new columns to the physical data region. +pub(crate) async fn add_columns_to_physical_data_region( + data_region_id: RegionId, + index_options: IndexOptions, + new_columns: &mut [ColumnMetadata], + data_region: &DataRegion, +) -> Result<()> { + // Return early if no new columns are added. + if new_columns.is_empty() { + return Ok(()); + } + + data_region + .add_columns(data_region_id, new_columns, index_options) + .await?; + + PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); + + Ok(()) +} diff --git a/src/metric-engine/src/engine/create/extract_new_columns.rs b/src/metric-engine/src/engine/create/extract_new_columns.rs new file mode 100644 index 0000000000..0803c7cdc2 --- /dev/null +++ b/src/metric-engine/src/engine/create/extract_new_columns.rs @@ -0,0 +1,174 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use api::v1::SemanticType; +use snafu::ensure; +use store_api::metadata::ColumnMetadata; +use store_api::region_request::RegionCreateRequest; +use store_api::storage::{ColumnId, RegionId}; + +use crate::error::{AddingFieldColumnSnafu, Result}; + +/// Extract new columns from the create requests. +pub fn extract_new_columns<'a>( + requests: &'a [(RegionId, RegionCreateRequest)], + physical_columns: &HashMap, + new_column_names: &mut HashSet<&'a str>, + new_columns: &mut Vec, +) -> Result<()> { + for (_, request) in requests { + for col in &request.column_metadatas { + if !physical_columns.contains_key(&col.column_schema.name) + && !new_column_names.contains(&col.column_schema.name.as_str()) + { + ensure!( + col.semantic_type != SemanticType::Field, + AddingFieldColumnSnafu { + name: col.column_schema.name.to_string(), + } + ); + new_column_names.insert(&col.column_schema.name); + // TODO(weny): avoid clone + new_columns.push(col.clone()); + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::{HashMap, HashSet}; + + use api::v1::SemanticType; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; + use store_api::region_request::RegionCreateRequest; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + + #[test] + fn test_extract_new_columns() { + let requests = vec![ + ( + RegionId::new(1, 1), + RegionCreateRequest { + column_metadatas: vec![ + ColumnMetadata { + column_schema: ColumnSchema::new( + "existing_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + "new_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }, + ], + engine: "test".to_string(), + primary_key: vec![], + options: HashMap::new(), + region_dir: "test".to_string(), + }, + ), + ( + RegionId::new(1, 2), + RegionCreateRequest { + column_metadatas: vec![ColumnMetadata { + // Duplicate column name + column_schema: ColumnSchema::new( + "new_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }], + engine: "test".to_string(), + primary_key: vec![], + options: HashMap::new(), + region_dir: "test".to_string(), + }, + ), + ]; + + let mut physical_columns = HashMap::new(); + physical_columns.insert("existing_column".to_string(), 0); + let mut new_column_names = HashSet::new(); + let mut new_columns = Vec::new(); + + let result = extract_new_columns( + &requests, + &physical_columns, + &mut new_column_names, + &mut new_columns, + ); + + assert!(result.is_ok()); + assert!(new_column_names.contains("new_column")); + assert_eq!(new_columns.len(), 1); + assert_eq!(new_columns[0].column_schema.name, "new_column"); + } + + #[test] + fn test_extract_new_columns_with_field_type() { + let requests = vec![( + RegionId::new(1, 1), + RegionCreateRequest { + column_metadatas: vec![ColumnMetadata { + column_schema: ColumnSchema::new( + "new_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 0, + }], + engine: "test".to_string(), + primary_key: vec![], + options: HashMap::new(), + region_dir: "test".to_string(), + }, + )]; + + let physical_columns = HashMap::new(); + let mut new_column_names = HashSet::new(); + let mut new_columns = Vec::new(); + + let err = extract_new_columns( + &requests, + &physical_columns, + &mut new_column_names, + &mut new_columns, + ) + .unwrap_err(); + + assert_matches!(err, Error::AddingFieldColumn { .. }); + } +} diff --git a/src/metric-engine/src/engine/create/validate.rs b/src/metric-engine/src/engine/create/validate.rs new file mode 100644 index 0000000000..df98294480 --- /dev/null +++ b/src/metric-engine/src/engine/create/validate.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::{ensure, ResultExt}; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use store_api::region_request::RegionCreateRequest; +use store_api::storage::RegionId; + +use crate::error::{ + ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, + Result, +}; + +/// Validate the create logical regions request. +/// +/// Returns extracted physical region id from the first request. +pub fn validate_create_logical_regions( + requests: &[(RegionId, RegionCreateRequest)], +) -> Result { + ensure!(!requests.is_empty(), EmptyRequestSnafu {}); + + let (_, request) = requests.first().unwrap(); + let first_physical_region_id_raw = request + .options + .get(LOGICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())?; + + let physical_region_id: RegionId = first_physical_region_id_raw + .parse::() + .with_context(|_| ParseRegionIdSnafu { + raw: first_physical_region_id_raw, + })? + .into(); + + // TODO(weny): Can we remove the check? + for (_, request) in requests.iter().skip(1) { + let physical_region_id_raw = request + .options + .get(LOGICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())?; + + ensure!( + physical_region_id_raw == first_physical_region_id_raw, + ConflictRegionOptionSnafu {} + ); + } + + Ok(physical_region_id) +} diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 0e22d436fa..3c273372fe 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -83,6 +83,18 @@ pub(crate) struct MetricEngineState { } impl MetricEngineState { + pub fn logical_region_exists_filter( + &self, + physical_region_id: RegionId, + ) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> { + let state = self + .physical_region_states() + .get(&physical_region_id) + .unwrap(); + + move |logical_region_id| state.logical_regions().contains(logical_region_id) + } + pub fn add_physical_region( &mut self, physical_region_id: RegionId, @@ -111,6 +123,22 @@ impl MetricEngineState { } } + /// # Panic + /// if the physical region does not exist + pub fn add_logical_regions( + &mut self, + physical_region_id: RegionId, + logical_region_ids: impl IntoIterator, + ) { + let physical_region_id = to_data_region_id(physical_region_id); + let state = self.physical_regions.get_mut(&physical_region_id).unwrap(); + for logical_region_id in logical_region_ids { + state.logical_regions.insert(logical_region_id); + self.logical_regions + .insert(logical_region_id, physical_region_id); + } + } + /// # Panic /// if the physical region does not exist pub fn add_logical_region( diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 13e318cfc5..4e082da305 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -50,13 +50,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Region `{}` already exists", region_id))] - RegionAlreadyExists { - region_id: RegionId, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to deserialize column metadata from {}", raw))] DeserializeColumnMetadata { raw: String, @@ -252,6 +245,19 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Empty request"))] + EmptyRequest { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unexpected request: {}", reason))] + UnexpectedRequest { + reason: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -269,7 +275,9 @@ impl ErrorExt for Error { | MultipleFieldColumn { .. } | NoFieldColumn { .. } | AddingFieldColumn { .. } - | ParseRegionOptions { .. } => StatusCode::InvalidArguments, + | ParseRegionOptions { .. } + | EmptyRequest { .. } + | UnexpectedRequest { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { StatusCode::Unsupported @@ -300,8 +308,6 @@ impl ErrorExt for Error { EncodePrimaryKey { source, .. } => source.status_code(), CollectRecordBatchStream { source, .. } => source.status_code(), - - RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists, } } diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index b6acab47a2..7c6fe5c9f4 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -51,6 +51,7 @@ //! ``` #![feature(let_chains)] +#![feature(assert_matches)] pub mod config; mod data_region; diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 70709a84ae..71a3a0e3ce 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -37,7 +37,7 @@ use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; use crate::error::{ CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, - ParseRegionIdSnafu, RegionAlreadyExistsSnafu, Result, + ParseRegionIdSnafu, Result, }; use crate::utils; @@ -57,7 +57,7 @@ const COLUMN_PREFIX: &str = "__column_"; /// table id + region sequence. This handler will transform the region group by /// itself. pub struct MetadataRegion { - mito: MitoEngine, + pub(crate) mito: MitoEngine, /// Logical lock for operations that need to be serialized. Like update & read region columns. /// /// Region entry will be registered on creating and opening logical region, and deregistered on @@ -73,36 +73,6 @@ impl MetadataRegion { } } - /// Add a new table key to metadata. - /// - /// This method will check if the table key already exists, if so, it will return - /// a [TableAlreadyExistsSnafu] error. - pub async fn add_logical_region( - &self, - physical_region_id: RegionId, - logical_region_id: RegionId, - ) -> Result<()> { - let region_id = utils::to_metadata_region_id(physical_region_id); - let region_key = Self::concat_region_key(logical_region_id); - - let put_success = self - .put_if_absent(region_id, region_key, String::new()) - .await?; - - if !put_success { - RegionAlreadyExistsSnafu { - region_id: logical_region_id, - } - .fail() - } else { - self.logical_region_lock - .write() - .await - .insert(logical_region_id, Arc::new(RwLock::new(()))); - Ok(()) - } - } - pub async fn open_logical_region(&self, logical_region_id: RegionId) { self.logical_region_lock .write() @@ -199,17 +169,6 @@ impl MetadataRegion { Ok(()) } - /// Check if the given logical region exists. - pub async fn is_logical_region_exists( - &self, - physical_region_id: RegionId, - logical_region_id: RegionId, - ) -> Result { - let region_id = utils::to_metadata_region_id(physical_region_id); - let region_key = Self::concat_region_key(logical_region_id); - self.exists(region_id, ®ion_key).await - } - /// Check if the given column exists. Return the semantic type if exists. #[cfg(test)] pub async fn column_semantic_type( @@ -474,6 +433,52 @@ impl MetadataRegion { } } + pub(crate) fn build_put_request_from_iter( + kv: impl Iterator, + ) -> RegionPutRequest { + let cols = vec![ + ColumnSchema { + column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + semantic_type: SemanticType::Timestamp as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Tag as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Field as _, + ..Default::default() + }, + ]; + let rows = Rows { + schema: cols, + rows: kv + .into_iter() + .map(|(key, value)| Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue(key)), + }, + Value { + value_data: Some(ValueData::StringValue(value)), + }, + ], + }) + .collect(), + }; + + RegionPutRequest { rows, hint: None } + } + fn build_put_request(key: &str, value: &str) -> RegionPutRequest { let cols = vec![ ColumnSchema { @@ -547,6 +552,44 @@ impl MetadataRegion { RegionDeleteRequest { rows } } + + /// Add logical regions to the metadata region. + pub async fn add_logical_regions( + &self, + physical_region_id: RegionId, + logical_regions: impl Iterator)>, + ) -> Result<()> { + let region_id = utils::to_metadata_region_id(physical_region_id); + let iter = logical_regions + .into_iter() + .flat_map(|(logical_region_id, column_metadatas)| { + Some(( + MetadataRegion::concat_region_key(logical_region_id), + String::new(), + )) + .into_iter() + .chain(column_metadatas.into_iter().map( + move |(name, column_metadata)| { + ( + MetadataRegion::concat_column_key(logical_region_id, name), + MetadataRegion::serialize_column_metadata(column_metadata), + ) + }, + )) + }) + .collect::>(); + + let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter()); + self.mito + .handle_request( + region_id, + store_api::region_request::RegionRequest::Put(put_request), + ) + .await + .context(MitoWriteOperationSnafu)?; + + Ok(()) + } } #[cfg(test)] @@ -724,31 +767,6 @@ mod test { assert_eq!(result.unwrap(), Some(value)); } - #[tokio::test] - async fn test_add_logical_region() { - let env = TestEnv::new().await; - env.init_metric_region().await; - let metadata_region = env.metadata_region(); - let physical_region_id = to_metadata_region_id(env.default_physical_region_id()); - - // add one table - let logical_region_id = RegionId::new(196, 2333); - metadata_region - .add_logical_region(physical_region_id, logical_region_id) - .await - .unwrap(); - assert!(metadata_region - .is_logical_region_exists(physical_region_id, logical_region_id) - .await - .unwrap()); - - // add it again - assert!(metadata_region - .add_logical_region(physical_region_id, logical_region_id) - .await - .is_err()); - } - #[tokio::test] async fn test_add_column() { let env = TestEnv::new().await; @@ -790,4 +808,95 @@ mod test { .unwrap(); assert_eq!(actual_semantic_type, Some(semantic_type)); } + + fn test_column_metadatas() -> HashMap { + HashMap::from([ + ( + "label1".to_string(), + ColumnMetadata { + column_schema: ColumnSchema::new( + "label1".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 5, + }, + ), + ( + "label2".to_string(), + ColumnMetadata { + column_schema: ColumnSchema::new( + "label2".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 5, + }, + ), + ]) + } + + #[tokio::test] + async fn add_logical_regions_to_meta_region() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let metadata_region = env.metadata_region(); + let physical_region_id = to_metadata_region_id(env.default_physical_region_id()); + let column_metadatas = test_column_metadatas(); + let logical_region_id = RegionId::new(1024, 1); + + let iter = vec![( + logical_region_id, + column_metadatas + .iter() + .map(|(k, v)| (k.as_str(), v)) + .collect::>(), + )]; + metadata_region + .add_logical_regions(physical_region_id, iter.into_iter()) + .await + .unwrap(); + // Add logical region again. + let iter = vec![( + logical_region_id, + column_metadatas + .iter() + .map(|(k, v)| (k.as_str(), v)) + .collect::>(), + )]; + metadata_region + .add_logical_regions(physical_region_id, iter.into_iter()) + .await + .unwrap(); + + // Check if the logical region is added. + let logical_regions = metadata_region + .logical_regions(physical_region_id) + .await + .unwrap(); + assert_eq!(logical_regions.len(), 2); + assert_eq!(logical_regions[1], logical_region_id); + + // Check if the logical region exists. + let result = metadata_region + .exists( + physical_region_id, + &MetadataRegion::concat_region_key(logical_region_id), + ) + .await + .unwrap(); + assert!(result); + + // Check if the logical region columns are added. + let logical_columns = metadata_region + .logical_columns(physical_region_id, logical_region_id) + .await + .unwrap() + .into_iter() + .collect::>(); + assert_eq!(logical_columns.len(), 2); + assert_eq!(column_metadatas, logical_columns); + } }