From 76d69901ea79ddbb916c8bccd8b29ad96aecdfdd Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sat, 8 Feb 2025 11:16:13 +0000 Subject: [PATCH] feat(metric-engine): introduce batch create request handling --- src/datanode/src/region_server.rs | 52 ++++- src/metric-engine/src/engine.rs | 17 +- src/metric-engine/src/engine/create.rs | 178 ++++++++++++++++-- .../src/engine/create/add_columns.rs | 42 +++++ .../src/engine/create/add_logical_regions.rs | 63 +++++++ .../src/engine/create/extract_new_columns.rs | 51 +++++ .../src/engine/create/validate.rs | 60 ++++++ src/metric-engine/src/engine/state.rs | 28 +++ src/metric-engine/src/error.rs | 9 +- src/metric-engine/src/metadata_region.rs | 48 ++++- 10 files changed, 528 insertions(+), 20 deletions(-) create mode 100644 src/metric-engine/src/engine/create/add_columns.rs create mode 100644 src/metric-engine/src/engine/create/add_logical_regions.rs create mode 100644 src/metric-engine/src/engine/create/extract_new_columns.rs create mode 100644 src/metric-engine/src/engine/create/validate.rs diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index f54d0219c0..ba90de513b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -60,7 +60,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{ convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest, - RegionOpenRequest, RegionRequest, RegionRequestBundle, + RegionCreateRequest, RegionOpenRequest, RegionRequest, RegionRequestBundle, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -165,6 +165,13 @@ impl RegionServer { self.inner.handle_batch_body(body).await } + pub async fn handle_create_request( + &self, + requests: Vec<(RegionId, RegionCreateRequest)>, + ) -> Result { + self.inner.handle_batch_create_requests(requests).await + } + async fn table_provider(&self, region_id: RegionId) -> Result> { let status = self .inner @@ -402,6 +409,49 @@ impl RegionServer { extensions, }) } + + async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result { + match batch { + BatchRegionRequest::Create(creates) => self.handle_create_request(creates).await, + BatchRegionRequest::Drop(drops) => { + // FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately + // modify this part to avoid inefficient serial loop calls. + self.handle_requests( + drops + .into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))), + ) + .await + } + BatchRegionRequest::Alter(alters) => { + // FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately + // modify this part to avoid inefficient serial loop calls. + self.handle_requests( + alters + .into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), + ) + .await + } + BatchRegionRequest::Put(put) => { + // TODO(yingwen, wenkang): Implement it in engine level. + self.handle_requests_parallel( + put.into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Put(req))), + ) + .await + } + BatchRegionRequest::Delete(delete) => { + // TODO(yingwen, wenkang): Implement it in engine level. + self.handle_requests_parallel( + delete + .into_iter() + .map(|(region_id, req)| (region_id, RegionRequest::Delete(req))), + ) + .await + } + } + } } #[async_trait] diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 39af8fe253..9b512a75d9 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -133,12 +133,17 @@ impl RegionEngine for MetricEngine { ) -> Result { match batch_request { BatchRegionRequest::Create(requests) => { - self.handle_requests( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Create(req))), - ) - .await + let mut extension_return_value = HashMap::new(); + let rows = self + .inner + .create_regions(requests, &mut extension_return_value) + .await + .map_err(BoxedError::new)?; + + Ok(RegionResponse { + affected_rows: rows, + extensions: extension_return_value, + }) } BatchRegionRequest::Alter(requests) => { self.handle_requests( diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 2e77a49e60..c1e83dc05f 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -12,8 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +mod add_columns; +mod add_logical_regions; +mod extract_new_columns; +mod validate; +use std::collections::{HashMap, HashSet}; + +use add_columns::add_columns_to_physical_data_region; +use add_logical_regions::add_logical_regions_to_meta_region; use api::v1::SemanticType; use common_error::ext::BoxedError; use common_telemetry::{info, warn}; @@ -39,20 +46,62 @@ use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; +use validate::validate_create_logical_regions; +use crate::engine::create::extract_new_columns::extract_new_columns; use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions}; use crate::engine::MetricEngineInner; use crate::error::{ AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, - ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, - InvalidMetadataSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu, - MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, - Result, SerializeColumnMetadataSnafu, + ConflictRegionOptionSnafu, CreateMitoRegionSnafu, EmptyRequestSnafu, + InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu, + MitoReadOperationSnafu, MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, + PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, }; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; -use crate::utils::{to_data_region_id, to_metadata_region_id}; +use crate::utils::{self, to_data_region_id, to_metadata_region_id}; impl MetricEngineInner { + pub async fn create_regions( + &self, + requests: Vec<(RegionId, RegionCreateRequest)>, + extension_return_value: &mut HashMap>, + ) -> Result { + if requests.is_empty() { + return Ok(0); + } + + for (_, request) in requests.iter() { + Self::verify_region_create_request(request)?; + } + + let first_request = &requests.first().unwrap().1; + if first_request.is_physical_table() { + for (region_id, request) in requests { + self.create_physical_region(region_id, request).await?; + } + return Ok(0); + } else if first_request + .options + .contains_key(LOGICAL_TABLE_METADATA_KEY) + { + let physical_region_id = self.create_logical_regions(requests).await?; + let physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns) + .context(SerializeColumnMetadataSnafu)?, + ); + } else { + return MissingRegionOptionSnafu {}.fail(); + } + + Ok(0) + } + /// Dispatch region creation request to physical region creation or logical pub async fn create_region( &self, @@ -144,6 +193,113 @@ impl MetricEngineInner { Ok(()) } + /// Create multiple logical regions on the same physical region. + /// + /// Returns the physical region id of the created logical regions. + async fn create_logical_regions( + &self, + requests: Vec<(RegionId, RegionCreateRequest)>, + ) -> Result { + ensure!(!requests.is_empty(), EmptyRequestSnafu {}); + + let physical_region_id = validate_create_logical_regions(&requests)?; + let data_region_id = utils::to_data_region_id(physical_region_id); + + // Filters out the requests that the logical region already exists + let requests = { + let state = self.state.read().unwrap(); + let logical_region_exists = state.logical_region_exists_filter(data_region_id); + // TODO(weny): log the skipped logical regions + requests + .into_iter() + .filter(|(region_id, _)| !logical_region_exists(region_id)) + .collect::>() + }; + + // Finds new columns to add to physical region + let mut new_column_names = HashSet::new(); + let mut new_columns = Vec::new(); + + let index_option = { + let state = &self.state.read().unwrap(); + let region_state = state + .physical_region_states() + .get(&data_region_id) + .with_context(|| PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })?; + let physical_columns = region_state.physical_columns(); + + extract_new_columns( + &requests, + physical_columns, + &mut new_column_names, + &mut new_columns, + )?; + region_state.options().index + }; + + // TODO(weny): we dont need to pass a mutable new_columns here. + add_columns_to_physical_data_region( + data_region_id, + index_option, + &mut new_columns, + &self.data_region, + ) + .await?; + + let physical_columns = self.data_region.physical_columns(data_region_id).await?; + let physical_schema_map = physical_columns + .iter() + .map(|metadata| (metadata.column_schema.name.as_str(), metadata)) + .collect::>(); + let logical_regions = requests + .iter() + .map(|(region_id, _)| (*region_id)) + .collect::>(); + let logical_regions_column_names = requests.iter().map(|(region_id, request)| { + ( + *region_id, + request + .column_metadatas + .iter() + .map(|metadata| { + // Safety: previous steps ensure the physical region exist + let column_metadata = *physical_schema_map + .get(metadata.column_schema.name.as_str()) + .unwrap(); + (metadata.column_schema.name.as_str(), column_metadata) + }) + .collect::>(), + ) + }); + + let new_add_columns = new_columns.iter().map(|metadata| { + // Safety: previous steps ensure the physical region exist + let column_metadata = *physical_schema_map + .get(metadata.column_schema.name.as_str()) + .unwrap(); + ( + metadata.column_schema.name.to_string(), + column_metadata.column_id, + ) + }); + + // Writes logical regions metadata to metadata region + add_logical_regions_to_meta_region( + &self.metadata_region, + physical_region_id, + logical_regions_column_names, + ) + .await?; + + let mut state = self.state.write().unwrap(); + state.add_physical_columns(data_region_id, new_add_columns); + state.add_logical_regions(physical_region_id, logical_regions); + + Ok(data_region_id) + } + /// Create a logical region. /// /// Physical table and logical table can have multiple regions, and their @@ -293,16 +449,16 @@ impl MetricEngineInner { new_columns: &mut [ColumnMetadata], index_options: IndexOptions, ) -> Result<()> { - // alter data region - self.data_region - .add_columns(data_region_id, new_columns, index_options) - .await?; - // Return early if no new columns are added. if new_columns.is_empty() { return Ok(()); } + // alter data region + self.data_region + .add_columns(data_region_id, new_columns, index_options) + .await?; + // correct the column id let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?; let after_alter_physical_schema_map = after_alter_physical_schema diff --git a/src/metric-engine/src/engine/create/add_columns.rs b/src/metric-engine/src/engine/create/add_columns.rs new file mode 100644 index 0000000000..78c66ac9ee --- /dev/null +++ b/src/metric-engine/src/engine/create/add_columns.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use store_api::metadata::ColumnMetadata; +use store_api::storage::RegionId; + +use crate::data_region::DataRegion; +use crate::engine::IndexOptions; +use crate::error::Result; +use crate::metrics::PHYSICAL_COLUMN_COUNT; + +/// Add new columns to the physical data region. +pub(crate) async fn add_columns_to_physical_data_region( + data_region_id: RegionId, + index_options: IndexOptions, + new_columns: &mut [ColumnMetadata], + data_region: &DataRegion, +) -> Result<()> { + // Return early if no new columns are added. + if new_columns.is_empty() { + return Ok(()); + } + + data_region + .add_columns(data_region_id, new_columns, index_options) + .await?; + + PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); + + Ok(()) +} diff --git a/src/metric-engine/src/engine/create/add_logical_regions.rs b/src/metric-engine/src/engine/create/add_logical_regions.rs new file mode 100644 index 0000000000..8d36178744 --- /dev/null +++ b/src/metric-engine/src/engine/create/add_logical_regions.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use snafu::ResultExt; +use store_api::metadata::ColumnMetadata; +use store_api::region_engine::RegionEngine; +use store_api::storage::RegionId; + +use crate::error::{MitoWriteOperationSnafu, Result}; +use crate::metadata_region::MetadataRegion; +use crate::utils; + +/// Add logical regions to the metadata region. +pub async fn add_logical_regions_to_meta_region( + metadata_region: &MetadataRegion, + physical_region_id: RegionId, + logical_regions: impl Iterator)>, +) -> Result<()> { + let region_id = utils::to_metadata_region_id(physical_region_id); + let iter = logical_regions + .into_iter() + .flat_map(|(logical_region_id, column_metadatas)| { + Some(( + MetadataRegion::concat_region_key(logical_region_id), + String::new(), + )) + .into_iter() + .chain(column_metadatas.into_iter().map( + move |(name, column_metadata)| { + ( + MetadataRegion::concat_column_key(logical_region_id, name), + MetadataRegion::serialize_column_metadata(column_metadata), + ) + }, + )) + }) + .collect::>(); + + let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter()); + metadata_region + .mito + .handle_request( + region_id, + store_api::region_request::RegionRequest::Put(put_request), + ) + .await + .context(MitoWriteOperationSnafu)?; + + Ok(()) +} diff --git a/src/metric-engine/src/engine/create/extract_new_columns.rs b/src/metric-engine/src/engine/create/extract_new_columns.rs new file mode 100644 index 0000000000..cd5a60b21a --- /dev/null +++ b/src/metric-engine/src/engine/create/extract_new_columns.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use api::v1::SemanticType; +use snafu::ensure; +use store_api::metadata::ColumnMetadata; +use store_api::region_request::RegionCreateRequest; +use store_api::storage::{ColumnId, RegionId}; + +use crate::error::{AddingFieldColumnSnafu, Result}; + +/// Extract new columns from the create requests. +pub fn extract_new_columns<'a>( + requests: &'a [(RegionId, RegionCreateRequest)], + physical_columns: &HashMap, + new_column_names: &mut HashSet<&'a str>, + new_columns: &mut Vec, +) -> Result<()> { + for (_, request) in requests { + for col in &request.column_metadatas { + if !physical_columns.contains_key(&col.column_schema.name) + && !new_column_names.contains(&col.column_schema.name.as_str()) + { + ensure!( + col.semantic_type != SemanticType::Field, + AddingFieldColumnSnafu { + name: col.column_schema.name.to_string(), + } + ); + new_column_names.insert(&col.column_schema.name); + // TODO(weny): avoid clone + new_columns.push(col.clone()); + } + } + } + + Ok(()) +} diff --git a/src/metric-engine/src/engine/create/validate.rs b/src/metric-engine/src/engine/create/validate.rs new file mode 100644 index 0000000000..df98294480 --- /dev/null +++ b/src/metric-engine/src/engine/create/validate.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::{ensure, ResultExt}; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use store_api::region_request::RegionCreateRequest; +use store_api::storage::RegionId; + +use crate::error::{ + ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, + Result, +}; + +/// Validate the create logical regions request. +/// +/// Returns extracted physical region id from the first request. +pub fn validate_create_logical_regions( + requests: &[(RegionId, RegionCreateRequest)], +) -> Result { + ensure!(!requests.is_empty(), EmptyRequestSnafu {}); + + let (_, request) = requests.first().unwrap(); + let first_physical_region_id_raw = request + .options + .get(LOGICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())?; + + let physical_region_id: RegionId = first_physical_region_id_raw + .parse::() + .with_context(|_| ParseRegionIdSnafu { + raw: first_physical_region_id_raw, + })? + .into(); + + // TODO(weny): Can we remove the check? + for (_, request) in requests.iter().skip(1) { + let physical_region_id_raw = request + .options + .get(LOGICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())?; + + ensure!( + physical_region_id_raw == first_physical_region_id_raw, + ConflictRegionOptionSnafu {} + ); + } + + Ok(physical_region_id) +} diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 0e22d436fa..3c273372fe 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -83,6 +83,18 @@ pub(crate) struct MetricEngineState { } impl MetricEngineState { + pub fn logical_region_exists_filter( + &self, + physical_region_id: RegionId, + ) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> { + let state = self + .physical_region_states() + .get(&physical_region_id) + .unwrap(); + + move |logical_region_id| state.logical_regions().contains(logical_region_id) + } + pub fn add_physical_region( &mut self, physical_region_id: RegionId, @@ -111,6 +123,22 @@ impl MetricEngineState { } } + /// # Panic + /// if the physical region does not exist + pub fn add_logical_regions( + &mut self, + physical_region_id: RegionId, + logical_region_ids: impl IntoIterator, + ) { + let physical_region_id = to_data_region_id(physical_region_id); + let state = self.physical_regions.get_mut(&physical_region_id).unwrap(); + for logical_region_id in logical_region_ids { + state.logical_regions.insert(logical_region_id); + self.logical_regions + .insert(logical_region_id, physical_region_id); + } + } + /// # Panic /// if the physical region does not exist pub fn add_logical_region( diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 674989db9f..8fd1551ff9 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -246,6 +246,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Empty request"))] + EmptyRequest { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -263,7 +269,8 @@ impl ErrorExt for Error { | MultipleFieldColumn { .. } | NoFieldColumn { .. } | AddingFieldColumn { .. } - | ParseRegionOptions { .. } => StatusCode::InvalidArguments, + | ParseRegionOptions { .. } + | EmptyRequest { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { StatusCode::Unsupported diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 70709a84ae..eb89f97f3c 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -57,7 +57,7 @@ const COLUMN_PREFIX: &str = "__column_"; /// table id + region sequence. This handler will transform the region group by /// itself. pub struct MetadataRegion { - mito: MitoEngine, + pub(crate) mito: MitoEngine, /// Logical lock for operations that need to be serialized. Like update & read region columns. /// /// Region entry will be registered on creating and opening logical region, and deregistered on @@ -474,6 +474,52 @@ impl MetadataRegion { } } + pub(crate) fn build_put_request_from_iter( + kv: impl Iterator, + ) -> RegionPutRequest { + let cols = vec![ + ColumnSchema { + column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + semantic_type: SemanticType::Timestamp as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Tag as _, + ..Default::default() + }, + ColumnSchema { + column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String as _, + semantic_type: SemanticType::Field as _, + ..Default::default() + }, + ]; + let rows = Rows { + schema: cols, + rows: kv + .into_iter() + .map(|(key, value)| Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue(key)), + }, + Value { + value_data: Some(ValueData::StringValue(value)), + }, + ], + }) + .collect(), + }; + + RegionPutRequest { rows, hint: None } + } + fn build_put_request(key: &str, value: &str) -> RegionPutRequest { let cols = vec![ ColumnSchema {