From 1d637cad519ee71631c442c45533fc0dc7302ba2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 5 Mar 2025 17:17:47 +0800 Subject: [PATCH] fix(metric-engine): group DDL requests (#5628) * fix(metric-engine): group DDL requests * test: add sqlness tests * chore: apply suggestions from CR * chore: apply suggestions from CR --- src/metric-engine/src/engine/alter.rs | 69 ++++++++++++++----- src/metric-engine/src/engine/create.rs | 60 ++++++++++++++-- .../src/engine/create/validate.rs | 57 --------------- .../common/alter/alter_metric_table.result | 60 ++++++++++++++++ .../common/alter/alter_metric_table.sql | 32 +++++++++ 5 files changed, 198 insertions(+), 80 deletions(-) delete mode 100644 src/metric-engine/src/engine/create/validate.rs diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 9bc5f56251..35bc7ce097 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -17,7 +17,6 @@ mod validate; use std::collections::{HashMap, HashSet}; -use common_telemetry::error; use extract_new_columns::extract_new_columns; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; @@ -54,37 +53,71 @@ impl MetricEngineInner { let (region_id, request) = requests.pop().unwrap(); self.alter_physical_region(region_id, request).await?; } else { - self.alter_logical_regions(requests, extension_return_value) - .await?; + // Fast path for single logical region alter request + if requests.len() == 1 { + // Safety: requests is not empty + let region_id = requests.first().unwrap().0; + let physical_region_id = self + .state + .read() + .unwrap() + .get_physical_region_id(region_id) + .with_context(|| LogicalRegionNotFoundSnafu { region_id })?; + self.alter_logical_regions(physical_region_id, requests, extension_return_value) + .await?; + } else { + let grouped_requests = + self.group_logical_region_requests_by_physical_region_id(requests)?; + for (physical_region_id, requests) in grouped_requests { + self.alter_logical_regions( + physical_region_id, + requests, + extension_return_value, + ) + .await?; + } + } } Ok(0) } + /// Groups the alter logical region requests by physical region id. + fn group_logical_region_requests_by_physical_region_id( + &self, + requests: Vec<(RegionId, RegionAlterRequest)>, + ) -> Result>> { + let mut result = HashMap::with_capacity(requests.len()); + let state = self.state.read().unwrap(); + + for (region_id, request) in requests { + let physical_region_id = state + .get_physical_region_id(region_id) + .with_context(|| LogicalRegionNotFoundSnafu { region_id })?; + result + .entry(physical_region_id) + .or_insert_with(Vec::new) + .push((region_id, request)); + } + + Ok(result) + } + /// Alter multiple logical regions on the same physical region. pub async fn alter_logical_regions( &self, + physical_region_id: RegionId, requests: Vec<(RegionId, RegionAlterRequest)>, extension_return_value: &mut HashMap>, ) -> Result { // Checks all alter requests are add columns. validate_alter_region_requests(&requests)?; - let first_logical_region_id = requests[0].0; - // Finds new columns to add let mut new_column_names = HashSet::new(); let mut new_columns_to_add = vec![]; - let (physical_region_id, index_options) = { + let index_options = { let state = &self.state.read().unwrap(); - let physical_region_id = state - .get_physical_region_id(first_logical_region_id) - .with_context(|| { - error!("Trying to alter an nonexistent region {first_logical_region_id}"); - LogicalRegionNotFoundSnafu { - region_id: first_logical_region_id, - } - })?; let region_state = state .physical_region_states() .get(&physical_region_id) @@ -100,7 +133,7 @@ impl MetricEngineInner { &mut new_columns_to_add, )?; - (physical_region_id, region_state.options().index) + region_state.options().index }; let data_region_id = to_data_region_id(physical_region_id); @@ -251,7 +284,11 @@ mod test { let region_id = env.default_logical_region_id(); engine_inner - .alter_logical_regions(vec![(region_id, request)], &mut HashMap::new()) + .alter_logical_regions( + physical_region_id, + vec![(region_id, request)], + &mut HashMap::new(), + ) .await .unwrap(); let semantic_type = metadata_region diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index e08a1c5e78..856bdb7b72 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -13,7 +13,6 @@ // limitations under the License. mod extract_new_columns; -mod validate; use std::collections::{HashMap, HashSet}; @@ -41,7 +40,6 @@ use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; -use validate::validate_create_logical_regions; use crate::engine::create::extract_new_columns::extract_new_columns; use crate::engine::options::{set_data_region_options, PhysicalRegionOptions}; @@ -49,8 +47,8 @@ use crate::engine::MetricEngineInner; use crate::error::{ ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu, - MultipleFieldColumnSnafu, NoFieldColumnSnafu, PhysicalRegionNotFoundSnafu, Result, - SerializeColumnMetadataSnafu, UnexpectedRequestSnafu, + MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, + Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu, }; use crate::metrics::PHYSICAL_REGION_COUNT; use crate::utils::{self, to_data_region_id, to_metadata_region_id}; @@ -87,8 +85,23 @@ impl MetricEngineInner { .options .contains_key(LOGICAL_TABLE_METADATA_KEY) { - self.create_logical_regions(requests, extension_return_value) - .await?; + if requests.len() == 1 { + let request = &requests.first().unwrap().1; + let physical_region_id = parse_physical_region_id(request)?; + self.create_logical_regions(physical_region_id, requests, extension_return_value) + .await?; + } else { + let grouped_requests = + group_create_logical_region_requests_by_physical_region_id(requests)?; + for (physical_region_id, requests) in grouped_requests { + self.create_logical_regions( + physical_region_id, + requests, + extension_return_value, + ) + .await?; + } + } } else { return MissingRegionOptionSnafu {}.fail(); } @@ -156,10 +169,10 @@ impl MetricEngineInner { /// Create multiple logical regions on the same physical region. async fn create_logical_regions( &self, + physical_region_id: RegionId, requests: Vec<(RegionId, RegionCreateRequest)>, extension_return_value: &mut HashMap>, ) -> Result<()> { - let physical_region_id = validate_create_logical_regions(&requests)?; let data_region_id = utils::to_data_region_id(physical_region_id); ensure!( @@ -494,6 +507,39 @@ impl MetricEngineInner { } } +/// Groups the create logical region requests by physical region id. +fn group_create_logical_region_requests_by_physical_region_id( + requests: Vec<(RegionId, RegionCreateRequest)>, +) -> Result>> { + let mut result = HashMap::with_capacity(requests.len()); + for (region_id, request) in requests { + let physical_region_id = parse_physical_region_id(&request)?; + result + .entry(physical_region_id) + .or_insert_with(Vec::new) + .push((region_id, request)); + } + + Ok(result) +} + +/// Parses the physical region id from the request. +fn parse_physical_region_id(request: &RegionCreateRequest) -> Result { + let physical_region_id_raw = request + .options + .get(LOGICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())?; + + let physical_region_id: RegionId = physical_region_id_raw + .parse::() + .with_context(|_| ParseRegionIdSnafu { + raw: physical_region_id_raw, + })? + .into(); + + Ok(physical_region_id) +} + /// Creates the region options for metadata region in metric engine. pub(crate) fn region_options_for_metadata_region( mut original: HashMap, diff --git a/src/metric-engine/src/engine/create/validate.rs b/src/metric-engine/src/engine/create/validate.rs deleted file mode 100644 index 943e42af52..0000000000 --- a/src/metric-engine/src/engine/create/validate.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use snafu::{ensure, ResultExt}; -use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; -use store_api::region_request::RegionCreateRequest; -use store_api::storage::RegionId; - -use crate::error::{ - ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result, -}; - -/// Validate the create logical regions request. -/// -/// Returns extracted physical region id from the first request. -pub fn validate_create_logical_regions( - requests: &[(RegionId, RegionCreateRequest)], -) -> Result { - let (_, request) = requests.first().unwrap(); - let first_physical_region_id_raw = request - .options - .get(LOGICAL_TABLE_METADATA_KEY) - .ok_or(MissingRegionOptionSnafu {}.build())?; - - let physical_region_id: RegionId = first_physical_region_id_raw - .parse::() - .with_context(|_| ParseRegionIdSnafu { - raw: first_physical_region_id_raw, - })? - .into(); - - // TODO(weny): Can we remove the check? - for (_, request) in requests.iter().skip(1) { - let physical_region_id_raw = request - .options - .get(LOGICAL_TABLE_METADATA_KEY) - .ok_or(MissingRegionOptionSnafu {}.build())?; - - ensure!( - physical_region_id_raw == first_physical_region_id_raw, - ConflictRegionOptionSnafu {} - ); - } - - Ok(physical_region_id) -} diff --git a/tests/cases/standalone/common/alter/alter_metric_table.result b/tests/cases/standalone/common/alter/alter_metric_table.result index d9808265af..6f1aea3cba 100644 --- a/tests/cases/standalone/common/alter/alter_metric_table.result +++ b/tests/cases/standalone/common/alter/alter_metric_table.result @@ -117,3 +117,63 @@ DROP TABLE phy; Affected Rows: 0 +CREATE TABLE phy ( + ts timestamp time index, + val double, + host string primary key +) +PARTITION ON COLUMNS ("host") ( + host < '1024', + host >= '1024' +) +engine=metric +with ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +INSERT INTO t1 (ts, val, host) VALUES + ('2022-01-01 00:00:00', 1.23, 'example.com'), + ('2022-01-02 00:00:00', 4.56, 'example.com'), + ('2022-01-03 00:00:00', 7.89, 'example.com'), + ('2022-01-01 00:00:00', 1.23, 'example.com'), + ('2022-01-02 00:00:00', 4.56, 'example.com'), + ('2022-01-03 00:00:00', 7.89, 'example.com'); + +Affected Rows: 6 + +SELECT * FROM t1; + ++-------------+---------------------+------+ +| host | ts | val | ++-------------+---------------------+------+ +| example.com | 2022-01-01T00:00:00 | 1.23 | +| example.com | 2022-01-02T00:00:00 | 4.56 | +| example.com | 2022-01-03T00:00:00 | 7.89 | ++-------------+---------------------+------+ + +ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY; + +Affected Rows: 0 + +SELECT * FROM t1; + ++-------------+---+---------------------+------+ +| host | k | ts | val | ++-------------+---+---------------------+------+ +| example.com | | 2022-01-01T00:00:00 | 1.23 | +| example.com | | 2022-01-02T00:00:00 | 4.56 | +| example.com | | 2022-01-03T00:00:00 | 7.89 | ++-------------+---+---------------------+------+ + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_metric_table.sql b/tests/cases/standalone/common/alter/alter_metric_table.sql index be3d7db53e..24b9de96d2 100644 --- a/tests/cases/standalone/common/alter/alter_metric_table.sql +++ b/tests/cases/standalone/common/alter/alter_metric_table.sql @@ -29,3 +29,35 @@ DROP TABLE t1; DROP TABLE t2; DROP TABLE phy; + +CREATE TABLE phy ( + ts timestamp time index, + val double, + host string primary key +) +PARTITION ON COLUMNS ("host") ( + host < '1024', + host >= '1024' +) +engine=metric +with ("physical_metric_table" = ""); + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy"); + +INSERT INTO t1 (ts, val, host) VALUES + ('2022-01-01 00:00:00', 1.23, 'example.com'), + ('2022-01-02 00:00:00', 4.56, 'example.com'), + ('2022-01-03 00:00:00', 7.89, 'example.com'), + ('2022-01-01 00:00:00', 1.23, 'example.com'), + ('2022-01-02 00:00:00', 4.56, 'example.com'), + ('2022-01-03 00:00:00', 7.89, 'example.com'); + +SELECT * FROM t1; + +ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY; + +SELECT * FROM t1; + +DROP TABLE t1; + +DROP TABLE phy;