From 99baa86b6a16ea88b5506c092bc50a765e464c7e Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 10 Feb 2025 11:44:58 +0000 Subject: [PATCH] feat(metric-engine): introduce batch alter request handling --- src/datanode/src/region_server.rs | 55 +------ src/metric-engine/src/engine.rs | 17 ++- src/metric-engine/src/engine/alter.rs | 141 +++++++++++++++++- .../src/engine/alter/extract_new_columns.rs | 57 +++++++ .../src/engine/alter/validate.rs | 33 ++++ src/metric-engine/src/engine/create.rs | 41 +++-- .../src/engine/create/add_logical_regions.rs | 40 ++--- .../src/engine/create/validate.rs | 5 +- src/metric-engine/src/engine/state.rs | 9 ++ src/metric-engine/src/error.rs | 10 +- 10 files changed, 305 insertions(+), 103 deletions(-) create mode 100644 src/metric-engine/src/engine/alter/extract_new_columns.rs create mode 100644 src/metric-engine/src/engine/alter/validate.rs diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ba90de513b..6e1231d838 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -60,7 +60,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{ convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest, - RegionCreateRequest, RegionOpenRequest, RegionRequest, RegionRequestBundle, + RegionOpenRequest, RegionRequest, RegionRequestBundle, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -165,11 +165,11 @@ impl RegionServer { self.inner.handle_batch_body(body).await } - pub async fn handle_create_request( + pub async fn handle_batch_request( &self, - requests: Vec<(RegionId, RegionCreateRequest)>, + batch_request: BatchRegionRequest, ) -> Result { - self.inner.handle_batch_create_requests(requests).await + self.inner.handle_batch_request(batch_request).await } async fn table_provider(&self, region_id: RegionId) -> Result> { @@ -372,10 +372,6 @@ impl RegionServer { self.handle_request(region_id, request).trace(span).await } - async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result { - self.inner.handle_batch_request(batch).await - } - async fn handle_vector_request( &self, requests: Vec<(RegionId, RegionRequest)>, @@ -409,49 +405,6 @@ impl RegionServer { extensions, }) } - - async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result { - match batch { - BatchRegionRequest::Create(creates) => self.handle_create_request(creates).await, - BatchRegionRequest::Drop(drops) => { - // FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately - // modify this part to avoid inefficient serial loop calls. - self.handle_requests( - drops - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))), - ) - .await - } - BatchRegionRequest::Alter(alters) => { - // FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately - // modify this part to avoid inefficient serial loop calls. - self.handle_requests( - alters - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), - ) - .await - } - BatchRegionRequest::Put(put) => { - // TODO(yingwen, wenkang): Implement it in engine level. - self.handle_requests_parallel( - put.into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Put(req))), - ) - .await - } - BatchRegionRequest::Delete(delete) => { - // TODO(yingwen, wenkang): Implement it in engine level. - self.handle_requests_parallel( - delete - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Delete(req))), - ) - .await - } - } - } } #[async_trait] diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 9b512a75d9..55fdb3f47d 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -146,12 +146,17 @@ impl RegionEngine for MetricEngine { }) } BatchRegionRequest::Alter(requests) => { - self.handle_requests( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), - ) - .await + let mut extension_return_value = HashMap::new(); + let rows = self + .inner + .alter_regions(requests, &mut extension_return_value) + .await + .map_err(BoxedError::new)?; + + Ok(RegionResponse { + affected_rows: rows, + extensions: extension_return_value, + }) } BatchRegionRequest::Drop(requests) => { self.handle_requests( diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 5fd0c13e78..1c50dfd68b 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -12,15 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +mod extract_new_columns; +mod validate; + +use std::collections::{HashMap, HashSet}; use common_telemetry::error; +use extract_new_columns::extract_new_columns; use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::storage::RegionId; +use validate::validate_alter_region_requests; +use crate::engine::create::{ + add_columns_to_physical_data_region, add_logical_regions_to_meta_region, +}; use crate::engine::MetricEngineInner; use crate::error::{ LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, @@ -28,6 +36,137 @@ use crate::error::{ use crate::utils::{to_data_region_id, to_metadata_region_id}; impl MetricEngineInner { + pub async fn alter_regions( + &self, + requests: Vec<(RegionId, RegionAlterRequest)>, + extension_return_value: &mut HashMap>, + ) -> Result { + if requests.is_empty() { + return Ok(0); + } + + let first_region_id = &requests.first().unwrap().0; + if self.is_physical_region(*first_region_id) { + for (region_id, request) in requests { + self.alter_physical_region(region_id, request).await?; + } + } else { + self.alter_logical_regions(requests, extension_return_value) + .await?; + } + Ok(0) + } + + /// Alter multiple logical regions on the same physical region. + pub async fn alter_logical_regions( + &self, + requests: Vec<(RegionId, RegionAlterRequest)>, + extension_return_value: &mut HashMap>, + ) -> Result { + validate_alter_region_requests(&requests)?; + + let first_logical_region_id = requests[0].0; + + // Finds new columns to add + let mut new_column_names = HashSet::new(); + let mut new_columns_to_add = vec![]; + + let (physical_region_id, index_options) = { + let state = &self.state.read().unwrap(); + let physical_region_id = state + .get_physical_region_id(first_logical_region_id) + .with_context(|| { + error!("Trying to alter an nonexistent region {first_logical_region_id}"); + LogicalRegionNotFoundSnafu { + region_id: first_logical_region_id, + } + })?; + let region_state = state + .physical_region_states() + .get(&physical_region_id) + .with_context(|| PhysicalRegionNotFoundSnafu { + region_id: physical_region_id, + })?; + let physical_columns = region_state.physical_columns(); + + extract_new_columns( + &requests, + physical_columns, + &mut new_column_names, + &mut new_columns_to_add, + )?; + + (physical_region_id, region_state.options().index) + }; + let data_region_id = to_data_region_id(physical_region_id); + + add_columns_to_physical_data_region( + data_region_id, + index_options, + &mut new_columns_to_add, + &self.data_region, + ) + .await?; + + let physical_columns = self.data_region.physical_columns(data_region_id).await?; + let physical_schema_map = physical_columns + .iter() + .map(|metadata| (metadata.column_schema.name.as_str(), metadata)) + .collect::>(); + + let logical_region_columns = requests.iter().map(|(region_id, request)| { + let AlterKind::AddColumns { columns } = &request.kind else { + unreachable!() + }; + ( + *region_id, + columns + .iter() + .flat_map(|col| { + let column_name = col.column_metadata.column_schema.name.as_str(); + if new_column_names.contains(column_name) { + let column_metadata = *physical_schema_map.get(column_name).unwrap(); + Some((column_metadata.column_schema.name.as_str(), column_metadata)) + } else { + None + } + }) + .collect::>(), + ) + }); + + let new_add_columns = new_columns_to_add.iter().map(|metadata| { + // Safety: previous steps ensure the physical region exist + let column_metadata = *physical_schema_map + .get(metadata.column_schema.name.as_str()) + .unwrap(); + ( + metadata.column_schema.name.to_string(), + column_metadata.column_id, + ) + }); + + // Writes logical regions metadata to metadata region + add_logical_regions_to_meta_region( + &self.metadata_region, + physical_region_id, + false, + logical_region_columns, + ) + .await?; + + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?, + ); + + let mut state = self.state.write().unwrap(); + state.add_physical_columns(data_region_id, new_add_columns); + state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id)); + + Ok(0) + } + /// Dispatch region alter request pub async fn alter_region( &self, diff --git a/src/metric-engine/src/engine/alter/extract_new_columns.rs b/src/metric-engine/src/engine/alter/extract_new_columns.rs new file mode 100644 index 0000000000..50ccf7cbcd --- /dev/null +++ b/src/metric-engine/src/engine/alter/extract_new_columns.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use api::v1::SemanticType; +use snafu::ensure; +use store_api::metadata::ColumnMetadata; +use store_api::region_request::{AlterKind, RegionAlterRequest}; +use store_api::storage::{ColumnId, RegionId}; + +use crate::error::{AddingFieldColumnSnafu, Result}; + +/// Extract new columns from the create requests. +/// +/// # Panics +/// +/// This function will panic if the alter kind is not `AddColumns`. +pub fn extract_new_columns<'a>( + requests: &'a [(RegionId, RegionAlterRequest)], + physical_columns: &HashMap, + new_column_names: &mut HashSet<&'a str>, + new_columns: &mut Vec, +) -> Result<()> { + for (_, request) in requests { + let AlterKind::AddColumns { columns } = &request.kind else { + unreachable!() + }; + for col in columns { + let column_name = col.column_metadata.column_schema.name.as_str(); + if !physical_columns.contains_key(column_name) + && !new_column_names.contains(column_name) + { + ensure!( + col.column_metadata.semantic_type != SemanticType::Field, + AddingFieldColumnSnafu { name: column_name } + ); + new_column_names.insert(column_name); + // TODO(weny): avoid clone + new_columns.push(col.column_metadata.clone()); + } + } + } + + Ok(()) +} diff --git a/src/metric-engine/src/engine/alter/validate.rs b/src/metric-engine/src/engine/alter/validate.rs new file mode 100644 index 0000000000..2e2d91eccf --- /dev/null +++ b/src/metric-engine/src/engine/alter/validate.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; +use store_api::region_request::{AlterKind, RegionAlterRequest}; +use store_api::storage::RegionId; + +use crate::error::{Result, UnsupportedAlterKindSnafu}; + +/// Validate the alter region requests. +pub fn validate_alter_region_requests(requests: &[(RegionId, RegionAlterRequest)]) -> Result<()> { + for (_, request) in requests { + ensure!( + matches!(request.kind, AlterKind::AddColumns { .. }), + UnsupportedAlterKindSnafu { + kind: request.kind.as_ref() + } + ); + } + + Ok(()) +} diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index c1e83dc05f..635f347ba9 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -19,8 +19,8 @@ mod validate; use std::collections::{HashMap, HashSet}; -use add_columns::add_columns_to_physical_data_region; -use add_logical_regions::add_logical_regions_to_meta_region; +pub(crate) use add_columns::add_columns_to_physical_data_region; +pub(crate) use add_logical_regions::add_logical_regions_to_meta_region; use api::v1::SemanticType; use common_error::ext::BoxedError; use common_telemetry::{info, warn}; @@ -53,10 +53,10 @@ use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegi use crate::engine::MetricEngineInner; use crate::error::{ AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, - ConflictRegionOptionSnafu, CreateMitoRegionSnafu, EmptyRequestSnafu, - InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu, - MitoReadOperationSnafu, MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, - PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, + ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, + InvalidMetadataSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu, + MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, + Result, SerializeColumnMetadataSnafu, }; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils::{self, to_data_region_id, to_metadata_region_id}; @@ -85,16 +85,8 @@ impl MetricEngineInner { .options .contains_key(LOGICAL_TABLE_METADATA_KEY) { - let physical_region_id = self.create_logical_regions(requests).await?; - let physical_columns = self - .data_region - .physical_columns(physical_region_id) + self.create_logical_regions(requests, extension_return_value) .await?; - extension_return_value.insert( - ALTER_PHYSICAL_EXTENSION_KEY.to_string(), - ColumnMetadata::encode_list(&physical_columns) - .context(SerializeColumnMetadataSnafu)?, - ); } else { return MissingRegionOptionSnafu {}.fail(); } @@ -194,14 +186,11 @@ impl MetricEngineInner { } /// Create multiple logical regions on the same physical region. - /// - /// Returns the physical region id of the created logical regions. async fn create_logical_regions( &self, requests: Vec<(RegionId, RegionCreateRequest)>, - ) -> Result { - ensure!(!requests.is_empty(), EmptyRequestSnafu {}); - + extension_return_value: &mut HashMap>, + ) -> Result<()> { let physical_region_id = validate_create_logical_regions(&requests)?; let data_region_id = utils::to_data_region_id(physical_region_id); @@ -257,7 +246,7 @@ impl MetricEngineInner { .iter() .map(|(region_id, _)| (*region_id)) .collect::>(); - let logical_regions_column_names = requests.iter().map(|(region_id, request)| { + let logical_region_columns = requests.iter().map(|(region_id, request)| { ( *region_id, request @@ -285,11 +274,17 @@ impl MetricEngineInner { ) }); + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?, + ); + // Writes logical regions metadata to metadata region add_logical_regions_to_meta_region( &self.metadata_region, physical_region_id, - logical_regions_column_names, + true, + logical_region_columns, ) .await?; @@ -297,7 +292,7 @@ impl MetricEngineInner { state.add_physical_columns(data_region_id, new_add_columns); state.add_logical_regions(physical_region_id, logical_regions); - Ok(data_region_id) + Ok(()) } /// Create a logical region. diff --git a/src/metric-engine/src/engine/create/add_logical_regions.rs b/src/metric-engine/src/engine/create/add_logical_regions.rs index 8d36178744..a8649c6ba7 100644 --- a/src/metric-engine/src/engine/create/add_logical_regions.rs +++ b/src/metric-engine/src/engine/create/add_logical_regions.rs @@ -27,27 +27,33 @@ use crate::utils; pub async fn add_logical_regions_to_meta_region( metadata_region: &MetadataRegion, physical_region_id: RegionId, + write_region_id: bool, logical_regions: impl Iterator)>, ) -> Result<()> { let region_id = utils::to_metadata_region_id(physical_region_id); - let iter = logical_regions - .into_iter() - .flat_map(|(logical_region_id, column_metadatas)| { - Some(( - MetadataRegion::concat_region_key(logical_region_id), - String::new(), - )) + let iter = + logical_regions .into_iter() - .chain(column_metadatas.into_iter().map( - move |(name, column_metadata)| { - ( - MetadataRegion::concat_column_key(logical_region_id, name), - MetadataRegion::serialize_column_metadata(column_metadata), - ) - }, - )) - }) - .collect::>(); + .flat_map(|(logical_region_id, column_metadatas)| { + if write_region_id { + Some(( + MetadataRegion::concat_region_key(logical_region_id), + String::new(), + )) + } else { + None + } + .into_iter() + .chain(column_metadatas.into_iter().map( + move |(name, column_metadata)| { + ( + MetadataRegion::concat_column_key(logical_region_id, name), + MetadataRegion::serialize_column_metadata(column_metadata), + ) + }, + )) + }) + .collect::>(); let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter()); metadata_region diff --git a/src/metric-engine/src/engine/create/validate.rs b/src/metric-engine/src/engine/create/validate.rs index df98294480..943e42af52 100644 --- a/src/metric-engine/src/engine/create/validate.rs +++ b/src/metric-engine/src/engine/create/validate.rs @@ -18,8 +18,7 @@ use store_api::region_request::RegionCreateRequest; use store_api::storage::RegionId; use crate::error::{ - ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, - Result, + ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result, }; /// Validate the create logical regions request. @@ -28,8 +27,6 @@ use crate::error::{ pub fn validate_create_logical_regions( requests: &[(RegionId, RegionCreateRequest)], ) -> Result { - ensure!(!requests.is_empty(), EmptyRequestSnafu {}); - let (_, request) = requests.first().unwrap(); let first_physical_region_id_raw = request .options diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 3c273372fe..ca29080e21 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -139,6 +139,15 @@ impl MetricEngineState { } } + pub fn invalid_logical_regions_cache( + &mut self, + logical_region_ids: impl IntoIterator, + ) { + for logical_region_id in logical_region_ids { + self.logical_columns.remove(&logical_region_id); + } + } + /// # Panic /// if the physical region does not exist pub fn add_logical_region( diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 8fd1551ff9..855f7d33b9 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -219,6 +219,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Unsupported alter kind: {}", kind))] + UnsupportedAlterKind { + kind: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Multiple field column found: {} and {}", previous, current))] MultipleFieldColumn { previous: String, @@ -270,7 +277,8 @@ impl ErrorExt for Error { | NoFieldColumn { .. } | AddingFieldColumn { .. } | ParseRegionOptions { .. } - | EmptyRequest { .. } => StatusCode::InvalidArguments, + | EmptyRequest { .. } + | UnsupportedAlterKind { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { StatusCode::Unsupported