fix(metric-engine): group DDL requests (#5628)

* fix(metric-engine): group DDL requests

* test: add sqlness tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-03-05 17:17:47 +08:00
committed by GitHub
parent a56030e6a5
commit 1d637cad51
5 changed files with 198 additions and 80 deletions

View File

@@ -17,7 +17,6 @@ mod validate;
use std::collections::{HashMap, HashSet};
use common_telemetry::error;
use extract_new_columns::extract_new_columns;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
@@ -54,37 +53,71 @@ impl MetricEngineInner {
let (region_id, request) = requests.pop().unwrap();
self.alter_physical_region(region_id, request).await?;
} else {
self.alter_logical_regions(requests, extension_return_value)
.await?;
// Fast path for single logical region alter request
if requests.len() == 1 {
// Safety: requests is not empty
let region_id = requests.first().unwrap().0;
let physical_region_id = self
.state
.read()
.unwrap()
.get_physical_region_id(region_id)
.with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
self.alter_logical_regions(physical_region_id, requests, extension_return_value)
.await?;
} else {
let grouped_requests =
self.group_logical_region_requests_by_physical_region_id(requests)?;
for (physical_region_id, requests) in grouped_requests {
self.alter_logical_regions(
physical_region_id,
requests,
extension_return_value,
)
.await?;
}
}
}
Ok(0)
}
/// Groups the alter logical region requests by physical region id.
fn group_logical_region_requests_by_physical_region_id(
&self,
requests: Vec<(RegionId, RegionAlterRequest)>,
) -> Result<HashMap<RegionId, Vec<(RegionId, RegionAlterRequest)>>> {
let mut result = HashMap::with_capacity(requests.len());
let state = self.state.read().unwrap();
for (region_id, request) in requests {
let physical_region_id = state
.get_physical_region_id(region_id)
.with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
result
.entry(physical_region_id)
.or_insert_with(Vec::new)
.push((region_id, request));
}
Ok(result)
}
/// Alter multiple logical regions on the same physical region.
pub async fn alter_logical_regions(
&self,
physical_region_id: RegionId,
requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
// Checks all alter requests are add columns.
validate_alter_region_requests(&requests)?;
let first_logical_region_id = requests[0].0;
// Finds new columns to add
let mut new_column_names = HashSet::new();
let mut new_columns_to_add = vec![];
let (physical_region_id, index_options) = {
let index_options = {
let state = &self.state.read().unwrap();
let physical_region_id = state
.get_physical_region_id(first_logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {first_logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: first_logical_region_id,
}
})?;
let region_state = state
.physical_region_states()
.get(&physical_region_id)
@@ -100,7 +133,7 @@ impl MetricEngineInner {
&mut new_columns_to_add,
)?;
(physical_region_id, region_state.options().index)
region_state.options().index
};
let data_region_id = to_data_region_id(physical_region_id);
@@ -251,7 +284,11 @@ mod test {
let region_id = env.default_logical_region_id();
engine_inner
.alter_logical_regions(vec![(region_id, request)], &mut HashMap::new())
.alter_logical_regions(
physical_region_id,
vec![(region_id, request)],
&mut HashMap::new(),
)
.await
.unwrap();
let semantic_type = metadata_region

View File

@@ -13,7 +13,6 @@
// limitations under the License.
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
@@ -41,7 +40,6 @@ use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
use validate::validate_create_logical_regions;
use crate::engine::create::extract_new_columns::extract_new_columns;
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
@@ -49,8 +47,8 @@ use crate::engine::MetricEngineInner;
use crate::error::{
ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, PhysicalRegionNotFoundSnafu, Result,
SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
@@ -87,8 +85,23 @@ impl MetricEngineInner {
.options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
self.create_logical_regions(requests, extension_return_value)
.await?;
if requests.len() == 1 {
let request = &requests.first().unwrap().1;
let physical_region_id = parse_physical_region_id(request)?;
self.create_logical_regions(physical_region_id, requests, extension_return_value)
.await?;
} else {
let grouped_requests =
group_create_logical_region_requests_by_physical_region_id(requests)?;
for (physical_region_id, requests) in grouped_requests {
self.create_logical_regions(
physical_region_id,
requests,
extension_return_value,
)
.await?;
}
}
} else {
return MissingRegionOptionSnafu {}.fail();
}
@@ -156,10 +169,10 @@ impl MetricEngineInner {
/// Create multiple logical regions on the same physical region.
async fn create_logical_regions(
&self,
physical_region_id: RegionId,
requests: Vec<(RegionId, RegionCreateRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
ensure!(
@@ -494,6 +507,39 @@ impl MetricEngineInner {
}
}
/// Groups the create logical region requests by physical region id.
fn group_create_logical_region_requests_by_physical_region_id(
requests: Vec<(RegionId, RegionCreateRequest)>,
) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
let mut result = HashMap::with_capacity(requests.len());
for (region_id, request) in requests {
let physical_region_id = parse_physical_region_id(&request)?;
result
.entry(physical_region_id)
.or_insert_with(Vec::new)
.push((region_id, request));
}
Ok(result)
}
/// Parses the physical region id from the request.
fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_region_id: RegionId = physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: physical_region_id_raw,
})?
.into();
Ok(physical_region_id)
}
/// Creates the region options for metadata region in metric engine.
pub(crate) fn region_options_for_metadata_region(
mut original: HashMap<String, String>,

View File

@@ -1,57 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ensure, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{
ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result,
};
/// Validate the create logical regions request.
///
/// Returns extracted physical region id from the first request.
pub fn validate_create_logical_regions(
requests: &[(RegionId, RegionCreateRequest)],
) -> Result<RegionId> {
let (_, request) = requests.first().unwrap();
let first_physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_region_id: RegionId = first_physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: first_physical_region_id_raw,
})?
.into();
// TODO(weny): Can we remove the check?
for (_, request) in requests.iter().skip(1) {
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
ensure!(
physical_region_id_raw == first_physical_region_id_raw,
ConflictRegionOptionSnafu {}
);
}
Ok(physical_region_id)
}

View File

@@ -117,3 +117,63 @@ DROP TABLE phy;
Affected Rows: 0
CREATE TABLE phy (
ts timestamp time index,
val double,
host string primary key
)
PARTITION ON COLUMNS ("host") (
host < '1024',
host >= '1024'
)
engine=metric
with ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy");
Affected Rows: 0
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-02 00:00:00', 4.56, 'example.com'),
('2022-01-03 00:00:00', 7.89, 'example.com'),
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-02 00:00:00', 4.56, 'example.com'),
('2022-01-03 00:00:00', 7.89, 'example.com');
Affected Rows: 6
SELECT * FROM t1;
+-------------+---------------------+------+
| host | ts | val |
+-------------+---------------------+------+
| example.com | 2022-01-01T00:00:00 | 1.23 |
| example.com | 2022-01-02T00:00:00 | 4.56 |
| example.com | 2022-01-03T00:00:00 | 7.89 |
+-------------+---------------------+------+
ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY;
Affected Rows: 0
SELECT * FROM t1;
+-------------+---+---------------------+------+
| host | k | ts | val |
+-------------+---+---------------------+------+
| example.com | | 2022-01-01T00:00:00 | 1.23 |
| example.com | | 2022-01-02T00:00:00 | 4.56 |
| example.com | | 2022-01-03T00:00:00 | 7.89 |
+-------------+---+---------------------+------+
DROP TABLE t1;
Affected Rows: 0
DROP TABLE phy;
Affected Rows: 0

View File

@@ -29,3 +29,35 @@ DROP TABLE t1;
DROP TABLE t2;
DROP TABLE phy;
CREATE TABLE phy (
ts timestamp time index,
val double,
host string primary key
)
PARTITION ON COLUMNS ("host") (
host < '1024',
host >= '1024'
)
engine=metric
with ("physical_metric_table" = "");
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine=metric with ("on_physical_table" = "phy");
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-02 00:00:00', 4.56, 'example.com'),
('2022-01-03 00:00:00', 7.89, 'example.com'),
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-02 00:00:00', 4.56, 'example.com'),
('2022-01-03 00:00:00', 7.89, 'example.com');
SELECT * FROM t1;
ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY;
SELECT * FROM t1;
DROP TABLE t1;
DROP TABLE phy;