perf: optimize table alteration speed in metric engine (#5526)

* feat(metric-engine): introduce batch alter request handling

* refactor: minor refactor

* refactor: push down filter to mito

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-02-14 17:11:48 +09:00
committed by GitHub
parent edb040dea3
commit c56106b883
14 changed files with 453 additions and 659 deletions

2
Cargo.lock generated
View File

@@ -6737,6 +6737,7 @@ version = "0.12.0"
dependencies = [
"api",
"aquamarine",
"async-stream",
"async-trait",
"base64 0.21.7",
"common-base",
@@ -6749,6 +6750,7 @@ dependencies = [
"common-time",
"datafusion",
"datatypes",
"futures-util",
"itertools 0.10.5",
"lazy_static",
"mito2",

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
api.workspace = true
aquamarine.workspace = true
async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
common-base.workspace = true
@@ -21,6 +22,7 @@ common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static = "1.4"
mito2.workspace = true

View File

@@ -13,8 +13,6 @@
// limitations under the License.
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::{debug, info, warn};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
@@ -32,11 +30,9 @@ use crate::error::{
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
use crate::utils;
const MAX_RETRIES: usize = 5;
/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
/// will handle all the data related operations across physical tables. Thus
/// every operation should be associated to a [RegionId], which is the physical
@@ -65,33 +61,30 @@ impl DataRegion {
pub async fn add_columns(
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
columns: Vec<ColumnMetadata>,
index_options: IndexOptions,
) -> Result<()> {
// Return early if no new columns are added.
if columns.is_empty() {
return Ok(());
}
let region_id = utils::to_data_region_id(region_id);
let mut retries = 0;
// submit alter request
while retries < MAX_RETRIES {
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;
let num_columns = columns.len();
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;
let _timer = MITO_DDL_DURATION.start_timer();
let _timer = MITO_DDL_DURATION.start_timer();
let result = self.mito.handle_request(region_id, request).await;
match result {
Ok(_) => return Ok(()),
Err(e) if e.status_code() == StatusCode::RequestOutdated => {
info!("Retrying alter {region_id} due to outdated schema version, times {retries}");
retries += 1;
continue;
}
Err(e) => {
return Err(e).context(MitoWriteOperationSnafu)?;
}
}
}
let _ = self
.mito
.handle_request(region_id, request)
.await
.context(MitoWriteOperationSnafu)?;
PHYSICAL_COLUMN_COUNT.add(num_columns as _);
Ok(())
}
@@ -101,7 +94,7 @@ impl DataRegion {
async fn assemble_alter_request(
&self,
region_id: RegionId,
columns: &mut [ColumnMetadata],
columns: Vec<ColumnMetadata>,
index_options: IndexOptions,
) -> Result<RegionRequest> {
// retrieve underlying version
@@ -128,9 +121,9 @@ impl DataRegion {
// overwrite semantic type
let new_columns = columns
.iter_mut()
.into_iter()
.enumerate()
.map(|(delta, c)| {
.map(|(delta, mut c)| {
if c.semantic_type == SemanticType::Tag {
if !c.column_schema.data_type.is_string() {
return ColumnTypeMismatchSnafu {
@@ -254,7 +247,7 @@ mod test {
// TestEnv will create a logical region which changes the version to 1.
assert_eq!(current_version, 1);
let mut new_columns = vec![
let new_columns = vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
@@ -277,7 +270,7 @@ mod test {
env.data_region()
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
new_columns,
IndexOptions::Inverted,
)
.await
@@ -311,7 +304,7 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;
let mut new_columns = vec![ColumnMetadata {
let new_columns = vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
@@ -320,7 +313,7 @@ mod test {
.data_region()
.add_columns(
env.default_physical_region_id(),
&mut new_columns,
new_columns,
IndexOptions::Inverted,
)
.await;

View File

@@ -146,12 +146,17 @@ impl RegionEngine for MetricEngine {
})
}
BatchRegionDdlRequest::Alter(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
)
.await
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.alter_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionDdlRequest::Drop(requests) => {
self.handle_requests(
@@ -184,7 +189,7 @@ impl RegionEngine for MetricEngine {
RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
RegionRequest::Alter(alter) => {
self.inner
.alter_region(region_id, alter, &mut extension_return_value)
.alter_regions(vec![(region_id, alter)], &mut extension_return_value)
.await
}
RegionRequest::Compact(_) => {

View File

@@ -12,148 +12,158 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use extract_new_columns::extract_new_columns;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use validate::validate_alter_region_requests;
use crate::engine::MetricEngineInner;
use crate::error::{
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
UnexpectedRequestSnafu,
};
use crate::utils::{to_data_region_id, to_metadata_region_id};
use crate::utils::to_data_region_id;
impl MetricEngineInner {
/// Dispatch region alter request
pub async fn alter_region(
pub async fn alter_regions(
&self,
region_id: RegionId,
request: RegionAlterRequest,
mut requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
let is_altering_physical_region = self.is_physical_region(region_id);
if requests.is_empty() {
return Ok(0);
}
let result = if is_altering_physical_region {
self.alter_physical_region(region_id, request).await
} else {
let physical_region_id = self.alter_logical_region(region_id, request).await?;
// Add physical table's column to extension map.
// It's ok to overwrite existing key, as the latter come schema is more up-to-date
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
let first_region_id = &requests.first().unwrap().0;
if self.is_physical_region(*first_region_id) {
ensure!(
requests.len() == 1,
UnexpectedRequestSnafu {
reason: "Physical table must be altered with single request".to_string(),
}
);
Ok(())
};
result.map(|_| 0)
let (region_id, request) = requests.pop().unwrap();
self.alter_physical_region(region_id, request).await?;
} else {
self.alter_logical_regions(requests, extension_return_value)
.await?;
}
Ok(0)
}
/// Return the physical region id behind this logical region
async fn alter_logical_region(
/// Alter multiple logical regions on the same physical region.
pub async fn alter_logical_regions(
&self,
logical_region_id: RegionId,
request: RegionAlterRequest,
) -> Result<RegionId> {
requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
// Checks all alter requests are add columns.
validate_alter_region_requests(&requests)?;
let first_logical_region_id = requests[0].0;
// Finds new columns to add
let mut new_column_names = HashSet::new();
let mut new_columns_to_add = vec![];
let (physical_region_id, index_options) = {
let state = &self.state.read().unwrap();
let physical_region_id = state
.get_physical_region_id(logical_region_id)
.get_physical_region_id(first_logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {logical_region_id}");
error!("Trying to alter an nonexistent region {first_logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
region_id: first_logical_region_id,
}
})?;
let index_options = state
let region_state = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?
.options()
.index;
})?;
let physical_columns = region_state.physical_columns();
(physical_region_id, index_options)
extract_new_columns(
&requests,
physical_columns,
&mut new_column_names,
&mut new_columns_to_add,
)?;
(physical_region_id, region_state.options().index)
};
let data_region_id = to_data_region_id(physical_region_id);
// only handle adding column
let AlterKind::AddColumns { columns } = request.kind else {
return Ok(physical_region_id);
};
let mut write_guards = HashMap::with_capacity(requests.len());
for (region_id, _) in requests.iter() {
if write_guards.contains_key(region_id) {
continue;
}
let _write_guard = self
.metadata_region
.write_lock_logical_region(*region_id)
.await;
write_guards.insert(*region_id, _write_guard);
}
// lock metadata region for this logical region id
let _write_guard = self
.metadata_region
.write_lock_logical_region(logical_region_id)
.await;
let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
// columns that already exist in physical region
let mut existing_columns = vec![];
let pre_existing_physical_columns = self
.data_region
.physical_columns(physical_region_id)
self.data_region
.add_columns(data_region_id, new_columns_to_add, index_options)
.await?;
let pre_exist_cols = pre_existing_physical_columns
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
.iter()
.map(|col| (col.column_schema.name.as_str(), col))
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
// check pre-existing physical columns so if any columns to add is already exist,
// we can skip it in physical alter operation
// (but still need to update them in logical alter operation)
for col in &columns {
if let Some(exist_column) =
pre_exist_cols.get(&col.column_metadata.column_schema.name.as_str())
{
// push the correct column schema with correct column id
existing_columns.push(*exist_column);
} else {
columns_to_add.push(col.column_metadata.clone());
}
}
let logical_region_columns = requests.iter().map(|(region_id, request)| {
let AlterKind::AddColumns { columns } = &request.kind else {
unreachable!()
};
(
*region_id,
columns
.iter()
.map(|col| {
let column_name = col.column_metadata.column_schema.name.as_str();
let column_metadata = *physical_schema_map.get(column_name).unwrap();
(column_name, column_metadata)
})
.collect::<HashMap<_, _>>(),
)
});
// alter data region
let data_region_id = to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
logical_region_id,
&mut columns_to_add,
index_options,
)
.await?;
let new_add_columns = new_column_names.iter().map(|name| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map.get(name).unwrap();
(name.to_string(), column_metadata.column_id)
});
// note here we don't use `columns` directly but concat `existing_columns` with `columns_to_add` to get correct metadata
// about already existing columns
for metadata in existing_columns.into_iter().chain(columns_to_add.iter()) {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, metadata)
.await?;
}
// Writes logical regions metadata to metadata region
self.metadata_region
.add_logical_regions(physical_region_id, false, logical_region_columns)
.await?;
// invalid logical column cache
self.state
.write()
.unwrap()
.invalid_logical_column_cache(logical_region_id);
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
);
Ok(physical_region_id)
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
Ok(0)
}
async fn alter_physical_region(
@@ -241,7 +251,7 @@ mod test {
let region_id = env.default_logical_region_id();
engine_inner
.alter_logical_region(region_id, request)
.alter_logical_regions(vec![(region_id, request)], &mut HashMap::new())
.await
.unwrap();
let semantic_type = metadata_region

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::{ColumnId, RegionId};
use crate::error::Result;
/// Extract new columns from the create requests.
///
/// # Panics
///
/// This function will panic if the alter kind is not `AddColumns`.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionAlterRequest)],
physical_columns: &HashMap<String, ColumnId>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
for (_, request) in requests {
let AlterKind::AddColumns { columns } = &request.kind else {
unreachable!()
};
for col in columns {
let column_name = col.column_metadata.column_schema.name.as_str();
if !physical_columns.contains_key(column_name)
&& !new_column_names.contains(column_name)
{
new_column_names.insert(column_name);
// TODO(weny): avoid clone
new_columns.push(col.column_metadata.clone());
}
}
}
Ok(())
}

View File

@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ensure;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use crate::error::{Result, UnsupportedAlterKindSnafu};
/// Validate the alter region requests.
pub fn validate_alter_region_requests(requests: &[(RegionId, RegionAlterRequest)]) -> Result<()> {
for (_, request) in requests {
ensure!(
matches!(request.kind, AlterKind::AddColumns { .. }),
UnsupportedAlterKindSnafu {
kind: request.kind.as_ref()
}
);
}
Ok(())
}

View File

@@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod add_columns;
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use add_columns::add_columns_to_physical_data_region;
use api::v1::SemanticType;
use common_telemetry::{info, warn};
use common_telemetry::info;
use common_time::{Timestamp, FOREVER};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -46,15 +44,15 @@ use store_api::storage::RegionId;
use validate::validate_create_logical_regions;
use crate::engine::create::extract_new_columns::extract_new_columns;
use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{
ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
EmptyRequestSnafu, InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, PhysicalRegionNotFoundSnafu, Result,
SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
};
use crate::metrics::{PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
impl MetricEngineInner {
@@ -87,16 +85,8 @@ impl MetricEngineInner {
.options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
let physical_region_id = self.create_logical_regions(requests).await?;
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
self.create_logical_regions(requests, extension_return_value)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
);
} else {
return MissingRegionOptionSnafu {}.fail();
}
@@ -162,14 +152,11 @@ impl MetricEngineInner {
}
/// Create multiple logical regions on the same physical region.
///
/// Returns the physical region id of the created logical regions.
async fn create_logical_regions(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
@@ -208,13 +195,9 @@ impl MetricEngineInner {
};
// TODO(weny): we dont need to pass a mutable new_columns here.
add_columns_to_physical_data_region(
data_region_id,
index_option,
&mut new_columns,
&self.data_region,
)
.await?;
self.data_region
.add_columns(data_region_id, new_columns, index_option)
.await?;
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
@@ -225,7 +208,7 @@ impl MetricEngineInner {
.iter()
.map(|(region_id, _)| (*region_id))
.collect::<Vec<_>>();
let logical_regions_column_names = requests.iter().map(|(region_id, request)| {
let logical_region_columns = requests.iter().map(|(region_id, request)| {
(
*region_id,
request
@@ -242,91 +225,26 @@ impl MetricEngineInner {
)
});
let new_add_columns = new_columns.iter().map(|metadata| {
let new_add_columns = new_column_names.iter().map(|name| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(
metadata.column_schema.name.to_string(),
column_metadata.column_id,
)
let column_metadata = *physical_schema_map.get(name).unwrap();
(name.to_string(), column_metadata.column_id)
});
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
);
// Writes logical regions metadata to metadata region
self.metadata_region
.add_logical_regions(physical_region_id, logical_regions_column_names)
.add_logical_regions(physical_region_id, true, logical_region_columns)
.await?;
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
Ok(data_region_id)
}
/// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id
/// which should be correct if the following requirements are met:
///
/// # NOTE
///
/// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns.
///
pub(crate) async fn add_columns_to_physical_data_region(
&self,
data_region_id: RegionId,
logical_region_id: RegionId,
new_columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<()> {
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
// double check to make sure column ids are not mismatched
// shouldn't be a expensive operation, given it only query for physical columns
for col in new_columns.iter_mut() {
let column_metadata = after_alter_physical_schema_map
.get(&col.column_schema.name.as_str())
.with_context(|| ColumnNotFoundSnafu {
name: &col.column_schema.name,
region_id: data_region_id,
})?;
if col != *column_metadata {
warn!(
"Add already existing columns with different column metadata to physical region({:?}): new column={:?}, old column={:?}",
data_region_id,
col,
column_metadata
);
// update to correct metadata
*col = (*column_metadata).clone();
}
}
// safety: previous step has checked this
self.state.write().unwrap().add_physical_columns(
data_region_id,
new_columns
.iter()
.map(|meta| (meta.column_schema.name.clone(), meta.column_id)),
);
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
Ok(())
}

View File

@@ -1,42 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
use crate::engine::IndexOptions;
use crate::error::Result;
use crate::metrics::PHYSICAL_COLUMN_COUNT;
/// Add new columns to the physical data region.
pub(crate) async fn add_columns_to_physical_data_region(
data_region_id: RegionId,
index_options: IndexOptions,
new_columns: &mut [ColumnMetadata],
data_region: &DataRegion,
) -> Result<()> {
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
Ok(())
}

View File

@@ -18,8 +18,7 @@ use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{
ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu,
Result,
ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result,
};
/// Validate the create logical regions request.
@@ -28,8 +27,6 @@ use crate::error::{
pub fn validate_create_logical_regions(
requests: &[(RegionId, RegionCreateRequest)],
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
let (_, request) = requests.first().unwrap();
let first_physical_region_id_raw = request
.options

View File

@@ -139,6 +139,15 @@ impl MetricEngineState {
}
}
pub fn invalid_logical_regions_cache(
&mut self,
logical_region_ids: impl IntoIterator<Item = RegionId>,
) {
for logical_region_id in logical_region_ids {
self.logical_columns.remove(&logical_region_id);
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_region(
@@ -233,10 +242,6 @@ impl MetricEngineState {
Ok(())
}
pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
self.logical_columns.remove(&logical_region_id);
}
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id)
}

View File

@@ -218,6 +218,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unsupported alter kind: {}", kind))]
UnsupportedAlterKind {
kind: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Multiple field column found: {} and {}", previous, current))]
MultipleFieldColumn {
previous: String,
@@ -246,12 +253,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Empty request"))]
EmptyRequest {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected request: {}", reason))]
UnexpectedRequest {
reason: String,
@@ -276,8 +277,8 @@ impl ErrorExt for Error {
| NoFieldColumn { .. }
| AddingFieldColumn { .. }
| ParseRegionOptions { .. }
| EmptyRequest { .. }
| UnexpectedRequest { .. } => StatusCode::InvalidArguments,
| UnexpectedRequest { .. }
| UnsupportedAlterKind { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported

View File

@@ -17,10 +17,13 @@ use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use async_stream::try_stream;
use base64::engine::general_purpose::STANDARD_NO_PAD;
use base64::Engine;
use common_recordbatch::util::collect;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::prelude::{col, lit};
use futures_util::stream::BoxStream;
use futures_util::TryStreamExt;
use mito2::engine::MitoEngine;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
@@ -80,28 +83,6 @@ impl MetadataRegion {
.insert(logical_region_id, Arc::new(RwLock::new(())));
}
/// Add a new column key to metadata.
///
/// This method won't check if the column already exists. But
/// will return if the column is successfully added.
pub async fn add_column(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
column_metadata: &ColumnMetadata,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let column_key =
Self::concat_column_key(logical_region_id, &column_metadata.column_schema.name);
self.put_if_absent(
region_id,
column_key,
Self::serialize_column_metadata(column_metadata),
)
.await
}
/// Retrieve a read lock guard of given logical region id.
pub async fn read_lock_logical_region(
&self,
@@ -169,22 +150,6 @@ impl MetadataRegion {
Ok(())
}
/// Check if the given column exists. Return the semantic type if exists.
#[cfg(test)]
pub async fn column_semantic_type(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
column_name: &str,
) -> Result<Option<SemanticType>> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let column_key = Self::concat_column_key(logical_region_id, column_name);
let semantic_type = self.get(region_id, &column_key).await?;
semantic_type
.map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
.transpose()
}
// TODO(ruihang): avoid using `get_all`
/// Get all the columns of a given logical region.
/// Return a list of (column_name, column_metadata).
@@ -197,7 +162,10 @@ impl MetadataRegion {
let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
let mut columns = vec![];
for (k, v) in self.get_all(metadata_region_id).await? {
for (k, v) in self
.get_all_with_prefix(metadata_region_id, &region_column_prefix)
.await?
{
if !k.starts_with(&region_column_prefix) {
continue;
}
@@ -214,7 +182,10 @@ impl MetadataRegion {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let mut regions = vec![];
for (k, _) in self.get_all(metadata_region_id).await? {
for k in self
.get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
.await?
{
if !k.starts_with(REGION_PREFIX) {
continue;
}
@@ -286,120 +257,122 @@ impl MetadataRegion {
}
}
/// Decode a record batch stream to a stream of items.
pub fn decode_batch_stream<T: Send + 'static>(
mut record_batch_stream: SendableRecordBatchStream,
decode: fn(RecordBatch) -> Vec<T>,
) -> BoxStream<'static, Result<T>> {
let stream = try_stream! {
while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
for item in decode(batch) {
yield item;
}
}
};
Box::pin(stream)
}
/// Decode a record batch to a list of key and value.
fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
let key_col = batch.column(0);
let val_col = batch.column(1);
(0..batch.num_rows())
.flat_map(move |row_index| {
let key = key_col
.get_ref(row_index)
.as_string()
.unwrap()
.map(|s| s.to_string());
key.map(|k| {
(
k,
val_col
.get_ref(row_index)
.as_string()
.unwrap()
.map(|s| s.to_string())
.unwrap_or_default(),
)
})
})
.collect()
}
/// Decode a record batch to a list of key.
fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
let key_col = batch.column(0);
(0..batch.num_rows())
.flat_map(move |row_index| {
let key = key_col
.get_ref(row_index)
.as_string()
.unwrap()
.map(|s| s.to_string());
key
})
.collect()
}
// simulate to `KvBackend`
//
// methods in this block assume the given region id is transformed.
impl MetadataRegion {
/// Put if not exist, return if this put operation is successful (error other
/// than "key already exist" will be wrapped in [Err]).
pub async fn put_if_absent(
&self,
region_id: RegionId,
key: String,
value: String,
) -> Result<bool> {
if self.exists(region_id, &key).await? {
return Ok(false);
}
fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
let put_request = Self::build_put_request(&key, &value);
self.mito
.handle_request(
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
Ok(true)
}
/// Check if the given key exists.
///
/// Notice that due to mito doesn't support transaction, TOCTTOU is possible.
pub async fn exists(&self, region_id: RegionId, key: &str) -> Result<bool> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
.await
.context(CollectRecordBatchStreamSnafu)?;
let exist = !scan_result.is_empty() && scan_result.first().unwrap().num_rows() != 0;
Ok(exist)
}
/// Retrieves the value associated with the given key in the specified region.
/// Returns `Ok(None)` if the key is not found.
#[cfg(test)]
pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
.await
.context(CollectRecordBatchStreamSnafu)?;
let Some(first_batch) = scan_result.first() else {
return Ok(None);
};
let val = first_batch
.column(0)
.get_ref(0)
.as_string()
.unwrap()
.map(|s| s.to_string());
Ok(val)
}
/// Load all metadata from a given region.
pub async fn get_all(&self, region_id: RegionId) -> Result<HashMap<String, String>> {
let scan_req = ScanRequest {
projection: Some(vec![
let projection = if key_only {
vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
} else {
vec![
METADATA_SCHEMA_KEY_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_INDEX,
]),
filters: vec![],
]
};
ScanRequest {
projection: Some(projection),
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
};
}
}
pub async fn get_all_with_prefix(
&self,
region_id: RegionId,
prefix: &str,
) -> Result<HashMap<String, String>> {
let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
let record_batch_stream = self
.mito
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
.await
.context(CollectRecordBatchStreamSnafu)?;
let mut result = HashMap::new();
for batch in scan_result {
let key_col = batch.column(0);
let val_col = batch.column(1);
for row_index in 0..batch.num_rows() {
let key = key_col
.get_ref(row_index)
.as_string()
.unwrap()
.map(|s| s.to_string());
let val = val_col
.get_ref(row_index)
.as_string()
.unwrap()
.map(|s| s.to_string());
result.insert(key.unwrap(), val.unwrap_or_default());
}
}
Ok(result)
decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
.try_collect::<HashMap<_, _>>()
.await
}
pub async fn get_all_key_with_prefix(
&self,
region_id: RegionId,
prefix: &str,
) -> Result<Vec<String>> {
let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
let record_batch_stream = self
.mito
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
.try_collect::<Vec<_>>()
.await
}
/// Delete the given keys. For performance consideration, this method
@@ -416,23 +389,6 @@ impl MetadataRegion {
Ok(())
}
/// Builds a [ScanRequest] to read metadata for a given key.
/// The request will contains a EQ filter on the key column.
///
/// Only the value column is projected.
fn build_read_request(key: &str) -> ScanRequest {
let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key));
ScanRequest {
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
}
}
pub(crate) fn build_put_request_from_iter(
kv: impl Iterator<Item = (String, String)>,
) -> RegionPutRequest {
@@ -479,47 +435,6 @@ impl MetadataRegion {
RegionPutRequest { rows, hint: None }
}
fn build_put_request(key: &str, value: &str) -> RegionPutRequest {
let cols = vec![
ColumnSchema {
column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Tag as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Field as _,
..Default::default()
},
];
let rows = Rows {
schema: cols,
rows: vec![Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
Value {
value_data: Some(ValueData::StringValue(value.to_string())),
},
],
}],
};
RegionPutRequest { rows, hint: None }
}
fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
let cols = vec![
ColumnSchema {
@@ -557,16 +472,21 @@ impl MetadataRegion {
pub async fn add_logical_regions(
&self,
physical_region_id: RegionId,
write_region_id: bool,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter = logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
if write_region_id {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
} else {
None
}
.into_iter()
.chain(column_metadatas.into_iter().map(
move |(name, column_metadata)| {
@@ -592,11 +512,65 @@ impl MetadataRegion {
}
}
#[cfg(test)]
impl MetadataRegion {
/// Retrieves the value associated with the given key in the specified region.
/// Returns `Ok(None)` if the key is not found.
pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
.eq(datafusion::prelude::lit(key));
let scan_req = ScanRequest {
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
filters: vec![filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
};
let record_batch_stream = self
.mito
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = common_recordbatch::util::collect(record_batch_stream)
.await
.context(CollectRecordBatchStreamSnafu)?;
let Some(first_batch) = scan_result.first() else {
return Ok(None);
};
let val = first_batch
.column(0)
.get_ref(0)
.as_string()
.unwrap()
.map(|s| s.to_string());
Ok(val)
}
/// Check if the given column exists. Return the semantic type if exists.
pub async fn column_semantic_type(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
column_name: &str,
) -> Result<Option<SemanticType>> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let column_key = Self::concat_column_key(logical_region_id, column_name);
let semantic_type = self.get(region_id, &column_key).await?;
semantic_type
.map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
.transpose()
}
}
#[cfg(test)]
mod test {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::region_request::RegionRequest;
use super::*;
use crate::test_util::TestEnv;
@@ -665,150 +639,6 @@ mod test {
assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
}
#[test]
fn test_build_read_request() {
let key = "test_key";
let expected_filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key));
let expected_scan_request = ScanRequest {
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
filters: vec![expected_filter_expr],
output_ordering: None,
limit: None,
series_row_selector: None,
sequence: None,
};
let actual_scan_request = MetadataRegion::build_read_request(key);
assert_eq!(actual_scan_request, expected_scan_request);
}
#[tokio::test]
async fn test_put_conditionally() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_physical_region_id());
// Test inserting a new key-value pair
let key = "test_key".to_string();
let value = "test_value".to_string();
let result = metadata_region
.put_if_absent(region_id, key.clone(), value.clone())
.await;
assert!(result.is_ok());
assert!(result.unwrap());
// Verify that the key-value pair was actually inserted
let scan_req = MetadataRegion::build_read_request("test_key");
let record_batch_stream = metadata_region
.mito
.scan_to_stream(region_id, scan_req)
.await
.unwrap();
let scan_result = collect(record_batch_stream).await.unwrap();
assert_eq!(scan_result.len(), 1);
// Test inserting the same key-value pair again
let result = metadata_region
.put_if_absent(region_id, key.clone(), value.clone())
.await;
assert!(result.is_ok());
assert!(!result.unwrap(),);
}
#[tokio::test]
async fn test_exist() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_physical_region_id());
// Test checking for a non-existent key
let key = "test_key".to_string();
let result = metadata_region.exists(region_id, &key).await;
assert!(result.is_ok());
assert!(!result.unwrap());
// Test inserting a key and then checking for its existence
let value = "test_value".to_string();
let put_request = MetadataRegion::build_put_request(&key, &value);
metadata_region
.mito
.handle_request(region_id, RegionRequest::Put(put_request))
.await
.unwrap();
let result = metadata_region.exists(region_id, &key).await;
assert!(result.is_ok());
assert!(result.unwrap(),);
}
#[tokio::test]
async fn test_get() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_physical_region_id());
// Test getting a non-existent key
let key = "test_key".to_string();
let result = metadata_region.get(region_id, &key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), None);
// Test inserting a key and then getting its value
let value = "test_value".to_string();
let put_request = MetadataRegion::build_put_request(&key, &value);
metadata_region
.mito
.handle_request(region_id, RegionRequest::Put(put_request))
.await
.unwrap();
let result = metadata_region.get(region_id, &key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(value));
}
#[tokio::test]
async fn test_add_column() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
let logical_region_id = RegionId::new(868, 8390);
let column_name = "column1";
let semantic_type = SemanticType::Tag;
let column_metadata = ColumnMetadata {
column_schema: ColumnSchema::new(
column_name,
ConcreteDataType::string_datatype(),
false,
),
semantic_type,
column_id: 5,
};
metadata_region
.add_column(physical_region_id, logical_region_id, &column_metadata)
.await
.unwrap();
let actual_semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
// duplicate column won't be updated
let is_updated = metadata_region
.add_column(physical_region_id, logical_region_id, &column_metadata)
.await
.unwrap();
assert!(!is_updated);
let actual_semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
}
fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
HashMap::from([
(
@@ -855,7 +685,7 @@ mod test {
.collect::<HashMap<_, _>>(),
)];
metadata_region
.add_logical_regions(physical_region_id, iter.into_iter())
.add_logical_regions(physical_region_id, true, iter.into_iter())
.await
.unwrap();
// Add logical region again.
@@ -867,7 +697,7 @@ mod test {
.collect::<HashMap<_, _>>(),
)];
metadata_region
.add_logical_regions(physical_region_id, iter.into_iter())
.add_logical_regions(physical_region_id, true, iter.into_iter())
.await
.unwrap();
@@ -877,17 +707,6 @@ mod test {
.await
.unwrap();
assert_eq!(logical_regions.len(), 2);
assert_eq!(logical_regions[1], logical_region_id);
// Check if the logical region exists.
let result = metadata_region
.exists(
physical_region_id,
&MetadataRegion::concat_region_key(logical_region_id),
)
.await
.unwrap();
assert!(result);
// Check if the logical region columns are added.
let logical_columns = metadata_region

View File

@@ -30,7 +30,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::FulltextOptions;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::IntoStaticStr;
use strum::{AsRefStr, IntoStaticStr};
use crate::logstore::entry;
use crate::metadata::{
@@ -475,7 +475,7 @@ impl TryFrom<AlterRequest> for RegionAlterRequest {
}
/// Kind of the alteration.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
pub enum AlterKind {
/// Add columns to the region.
AddColumns {