perf: optimize table creation speed in metric engine (#5503)

* feat(metric-engine): introduce batch create request handling

* chore: remove unused code

* test: add more tests

* chore: remove unused error

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-02-13 16:39:04 +09:00
committed by GitHub
parent fbd5316fdb
commit bb97f1bf16
9 changed files with 681 additions and 220 deletions

View File

@@ -42,7 +42,7 @@ use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
};
use store_api::region_request::RegionRequest;
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use self::state::MetricEngineState;
@@ -127,6 +127,43 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,
) -> Result<RegionResponse, BoxedError> {
match batch_request {
BatchRegionDdlRequest::Create(requests) => {
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.create_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionDdlRequest::Alter(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
)
.await
}
BatchRegionDdlRequest::Drop(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
)
.await
}
}
}
/// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request(
&self,
@@ -139,7 +176,7 @@ impl RegionEngine for MetricEngine {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner
.create_region(region_id, create, &mut extension_return_value)
.create_regions(vec![(region_id, create)], &mut extension_return_value)
.await
}
RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
@@ -301,6 +338,24 @@ impl MetricEngine {
.await
.map_err(BoxedError::new)
}
async fn handle_requests(
&self,
requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
) -> Result<RegionResponse, BoxedError> {
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for (region_id, request) in requests {
let response = self.handle_request(region_id, request).await?;
affected_rows += response.affected_rows;
extensions.extend(response.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
}
#[cfg(test)]

View File

@@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
mod add_columns;
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use add_columns::add_columns_to_physical_data_region;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::{info, warn};
use common_time::{Timestamp, FOREVER};
use datatypes::data_type::ConcreteDataType;
@@ -39,36 +43,51 @@ use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
use validate::validate_create_logical_regions;
use crate::engine::create::extract_new_columns::extract_new_columns;
use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{
AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu,
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
InvalidMetadataSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
Result, SerializeColumnMetadataSnafu,
ColumnNotFoundSnafu, ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
EmptyRequestSnafu, InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, PhysicalRegionNotFoundSnafu, Result,
SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils::{to_data_region_id, to_metadata_region_id};
use crate::metrics::{PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
impl MetricEngineInner {
/// Dispatch region creation request to physical region creation or logical
pub async fn create_region(
pub async fn create_regions(
&self,
region_id: RegionId,
request: RegionCreateRequest,
mut requests: Vec<(RegionId, RegionCreateRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
Self::verify_region_create_request(&request)?;
if requests.is_empty() {
return Ok(0);
}
let result = if request.is_physical_table() {
self.create_physical_region(region_id, request).await
} else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) {
let physical_region_id = self.create_logical_region(region_id, request).await?;
for (_, request) in requests.iter() {
Self::verify_region_create_request(request)?;
}
// Add physical table's column to extension map.
// It's ok to overwrite existing key, as the latter come schema is more up-to-date
let first_request = &requests.first().unwrap().1;
if first_request.is_physical_table() {
ensure!(
requests.len() == 1,
UnexpectedRequestSnafu {
reason: "Physical table must be created with single request".to_string(),
}
);
let (region_id, request) = requests.pop().unwrap();
self.create_physical_region(region_id, request).await?;
return Ok(0);
} else if first_request
.options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
let physical_region_id = self.create_logical_regions(requests).await?;
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
@@ -78,13 +97,11 @@ impl MetricEngineInner {
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
);
Ok(())
} else {
MissingRegionOptionSnafu {}.fail()
};
return MissingRegionOptionSnafu {}.fail();
}
result.map(|_| 0)
Ok(0)
}
/// Initialize a physical metric region at given region id.
@@ -144,52 +161,33 @@ impl MetricEngineInner {
Ok(())
}
/// Create a logical region.
/// Create multiple logical regions on the same physical region.
///
/// Physical table and logical table can have multiple regions, and their
/// region number should be the same. Thus we can infer the physical region
/// id by simply replace the table id part in the given region id, which
/// represent the "logical region" to request.
///
/// This method will alter the data region to add columns if necessary.
///
/// If the logical region to create already exists, this method will do nothing.
///
/// `alter_request` is a hashmap that stores the alter requests that were executed
/// to the physical region.
///
/// Return the physical region id of this logical region
async fn create_logical_region(
/// Returns the physical region id of the created logical regions.
async fn create_logical_regions(
&self,
logical_region_id: RegionId,
request: RegionCreateRequest,
requests: Vec<(RegionId, RegionCreateRequest)>,
) -> Result<RegionId> {
// transform IDs
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_region_id: RegionId = physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: physical_region_id_raw,
})?
.into();
let (data_region_id, metadata_region_id) = Self::transform_region_id(physical_region_id);
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
// check if the logical region already exist
if self
.metadata_region
.is_logical_region_exists(metadata_region_id, logical_region_id)
.await?
{
info!("Create a existing logical region {logical_region_id}. Skipped");
return Ok(data_region_id);
}
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
// Filters out the requests that the logical region already exists
let requests = {
let state = self.state.read().unwrap();
let logical_region_exists = state.logical_region_exists_filter(data_region_id);
// TODO(weny): log the skipped logical regions
requests
.into_iter()
.filter(|(region_id, _)| !logical_region_exists(region_id))
.collect::<Vec<_>>()
};
// Finds new columns to add to physical region
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();
// find new columns to add
let mut new_columns = vec![];
let mut existing_columns = vec![];
let index_option = {
let state = &self.state.read().unwrap();
let region_state = state
@@ -200,81 +198,69 @@ impl MetricEngineInner {
})?;
let physical_columns = region_state.physical_columns();
for col in &request.column_metadatas {
if !physical_columns.contains_key(&col.column_schema.name) {
// Multi-field on physical table is explicit forbidden at present
// TODO(ruihang): support multi-field on both logical and physical column
ensure!(
col.semantic_type != SemanticType::Field,
AddingFieldColumnSnafu {
name: col.column_schema.name.clone()
}
);
new_columns.push(col.clone());
} else {
existing_columns.push(col.column_schema.name.clone());
}
}
extract_new_columns(
&requests,
physical_columns,
&mut new_column_names,
&mut new_columns,
)?;
region_state.options().index
};
if !new_columns.is_empty() {
info!("Found new columns {new_columns:?} to add to physical region {data_region_id}");
// TODO(weny): we dont need to pass a mutable new_columns here.
add_columns_to_physical_data_region(
data_region_id,
index_option,
&mut new_columns,
&self.data_region,
)
.await?;
self.add_columns_to_physical_data_region(
data_region_id,
logical_region_id,
&mut new_columns,
index_option,
)
.await?;
// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}
}
// register logical region to metadata region
self.metadata_region
.add_logical_region(metadata_region_id, logical_region_id)
.await?;
// register existing physical column to this new logical region.
let physical_schema = self
.data_region
.physical_columns(data_region_id)
.await
.map_err(BoxedError::new)
.context(MitoReadOperationSnafu)?;
let physical_schema_map = physical_schema
.into_iter()
.map(|metadata| (metadata.column_schema.name.clone(), metadata))
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
for col in &existing_columns {
let column_metadata = physical_schema_map
.get(col)
.with_context(|| ColumnNotFoundSnafu {
name: col,
region_id: physical_region_id,
})?
.clone();
self.metadata_region
.add_column(metadata_region_id, logical_region_id, &column_metadata)
.await?;
}
let logical_regions = requests
.iter()
.map(|(region_id, _)| (*region_id))
.collect::<Vec<_>>();
let logical_regions_column_names = requests.iter().map(|(region_id, request)| {
(
*region_id,
request
.column_metadatas
.iter()
.map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(metadata.column_schema.name.as_str(), column_metadata)
})
.collect::<HashMap<_, _>>(),
)
});
// update the mapping
// Safety: previous steps ensure the physical region exist
self.state
.write()
.unwrap()
.add_logical_region(physical_region_id, logical_region_id);
info!("Created new logical region {logical_region_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
let new_add_columns = new_columns.iter().map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(
metadata.column_schema.name.to_string(),
column_metadata.column_id,
)
});
// Writes logical regions metadata to metadata region
self.metadata_region
.add_logical_regions(physical_region_id, logical_regions_column_names)
.await?;
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
Ok(data_region_id)
}
@@ -293,16 +279,16 @@ impl MetricEngineInner {
new_columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
use crate::engine::IndexOptions;
use crate::error::Result;
use crate::metrics::PHYSICAL_COLUMN_COUNT;
/// Add new columns to the physical data region.
pub(crate) async fn add_columns_to_physical_data_region(
data_region_id: RegionId,
index_options: IndexOptions,
new_columns: &mut [ColumnMetadata],
data_region: &DataRegion,
) -> Result<()> {
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
Ok(())
}

View File

@@ -0,0 +1,174 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::{ColumnId, RegionId};
use crate::error::{AddingFieldColumnSnafu, Result};
/// Extract new columns from the create requests.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionCreateRequest)],
physical_columns: &HashMap<String, ColumnId>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
for (_, request) in requests {
for col in &request.column_metadatas {
if !physical_columns.contains_key(&col.column_schema.name)
&& !new_column_names.contains(&col.column_schema.name.as_str())
{
ensure!(
col.semantic_type != SemanticType::Field,
AddingFieldColumnSnafu {
name: col.column_schema.name.to_string(),
}
);
new_column_names.insert(&col.column_schema.name);
// TODO(weny): avoid clone
new_columns.push(col.clone());
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
#[test]
fn test_extract_new_columns() {
let requests = vec![
(
RegionId::new(1, 1),
RegionCreateRequest {
column_metadatas: vec![
ColumnMetadata {
column_schema: ColumnSchema::new(
"existing_column".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 0,
},
ColumnMetadata {
column_schema: ColumnSchema::new(
"new_column".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 0,
},
],
engine: "test".to_string(),
primary_key: vec![],
options: HashMap::new(),
region_dir: "test".to_string(),
},
),
(
RegionId::new(1, 2),
RegionCreateRequest {
column_metadatas: vec![ColumnMetadata {
// Duplicate column name
column_schema: ColumnSchema::new(
"new_column".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 0,
}],
engine: "test".to_string(),
primary_key: vec![],
options: HashMap::new(),
region_dir: "test".to_string(),
},
),
];
let mut physical_columns = HashMap::new();
physical_columns.insert("existing_column".to_string(), 0);
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();
let result = extract_new_columns(
&requests,
&physical_columns,
&mut new_column_names,
&mut new_columns,
);
assert!(result.is_ok());
assert!(new_column_names.contains("new_column"));
assert_eq!(new_columns.len(), 1);
assert_eq!(new_columns[0].column_schema.name, "new_column");
}
#[test]
fn test_extract_new_columns_with_field_type() {
let requests = vec![(
RegionId::new(1, 1),
RegionCreateRequest {
column_metadatas: vec![ColumnMetadata {
column_schema: ColumnSchema::new(
"new_column".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 0,
}],
engine: "test".to_string(),
primary_key: vec![],
options: HashMap::new(),
region_dir: "test".to_string(),
},
)];
let physical_columns = HashMap::new();
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();
let err = extract_new_columns(
&requests,
&physical_columns,
&mut new_column_names,
&mut new_columns,
)
.unwrap_err();
assert_matches!(err, Error::AddingFieldColumn { .. });
}
}

View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ensure, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{
ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu,
Result,
};
/// Validate the create logical regions request.
///
/// Returns extracted physical region id from the first request.
pub fn validate_create_logical_regions(
requests: &[(RegionId, RegionCreateRequest)],
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
let (_, request) = requests.first().unwrap();
let first_physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_region_id: RegionId = first_physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: first_physical_region_id_raw,
})?
.into();
// TODO(weny): Can we remove the check?
for (_, request) in requests.iter().skip(1) {
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
ensure!(
physical_region_id_raw == first_physical_region_id_raw,
ConflictRegionOptionSnafu {}
);
}
Ok(physical_region_id)
}

View File

@@ -83,6 +83,18 @@ pub(crate) struct MetricEngineState {
}
impl MetricEngineState {
pub fn logical_region_exists_filter(
&self,
physical_region_id: RegionId,
) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> {
let state = self
.physical_region_states()
.get(&physical_region_id)
.unwrap();
move |logical_region_id| state.logical_regions().contains(logical_region_id)
}
pub fn add_physical_region(
&mut self,
physical_region_id: RegionId,
@@ -111,6 +123,22 @@ impl MetricEngineState {
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_regions(
&mut self,
physical_region_id: RegionId,
logical_region_ids: impl IntoIterator<Item = RegionId>,
) {
let physical_region_id = to_data_region_id(physical_region_id);
let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
for logical_region_id in logical_region_ids {
state.logical_regions.insert(logical_region_id);
self.logical_regions
.insert(logical_region_id, physical_region_id);
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_region(

View File

@@ -50,13 +50,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Region `{}` already exists", region_id))]
RegionAlreadyExists {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize column metadata from {}", raw))]
DeserializeColumnMetadata {
raw: String,
@@ -252,6 +245,19 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty request"))]
EmptyRequest {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected request: {}", reason))]
UnexpectedRequest {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -269,7 +275,9 @@ impl ErrorExt for Error {
| MultipleFieldColumn { .. }
| NoFieldColumn { .. }
| AddingFieldColumn { .. }
| ParseRegionOptions { .. } => StatusCode::InvalidArguments,
| ParseRegionOptions { .. }
| EmptyRequest { .. }
| UnexpectedRequest { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported
@@ -300,8 +308,6 @@ impl ErrorExt for Error {
EncodePrimaryKey { source, .. } => source.status_code(),
CollectRecordBatchStream { source, .. } => source.status_code(),
RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists,
}
}

View File

@@ -51,6 +51,7 @@
//! ```
#![feature(let_chains)]
#![feature(assert_matches)]
pub mod config;
mod data_region;

View File

@@ -37,7 +37,7 @@ use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use crate::error::{
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
ParseRegionIdSnafu, RegionAlreadyExistsSnafu, Result,
ParseRegionIdSnafu, Result,
};
use crate::utils;
@@ -57,7 +57,7 @@ const COLUMN_PREFIX: &str = "__column_";
/// table id + region sequence. This handler will transform the region group by
/// itself.
pub struct MetadataRegion {
mito: MitoEngine,
pub(crate) mito: MitoEngine,
/// Logical lock for operations that need to be serialized. Like update & read region columns.
///
/// Region entry will be registered on creating and opening logical region, and deregistered on
@@ -73,36 +73,6 @@ impl MetadataRegion {
}
}
/// Add a new table key to metadata.
///
/// This method will check if the table key already exists, if so, it will return
/// a [TableAlreadyExistsSnafu] error.
pub async fn add_logical_region(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
let put_success = self
.put_if_absent(region_id, region_key, String::new())
.await?;
if !put_success {
RegionAlreadyExistsSnafu {
region_id: logical_region_id,
}
.fail()
} else {
self.logical_region_lock
.write()
.await
.insert(logical_region_id, Arc::new(RwLock::new(())));
Ok(())
}
}
pub async fn open_logical_region(&self, logical_region_id: RegionId) {
self.logical_region_lock
.write()
@@ -199,17 +169,6 @@ impl MetadataRegion {
Ok(())
}
/// Check if the given logical region exists.
pub async fn is_logical_region_exists(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
self.exists(region_id, &region_key).await
}
/// Check if the given column exists. Return the semantic type if exists.
#[cfg(test)]
pub async fn column_semantic_type(
@@ -474,6 +433,52 @@ impl MetadataRegion {
}
}
pub(crate) fn build_put_request_from_iter(
kv: impl Iterator<Item = (String, String)>,
) -> RegionPutRequest {
let cols = vec![
ColumnSchema {
column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Tag as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Field as _,
..Default::default()
},
];
let rows = Rows {
schema: cols,
rows: kv
.into_iter()
.map(|(key, value)| Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue(key)),
},
Value {
value_data: Some(ValueData::StringValue(value)),
},
],
})
.collect(),
};
RegionPutRequest { rows, hint: None }
}
fn build_put_request(key: &str, value: &str) -> RegionPutRequest {
let cols = vec![
ColumnSchema {
@@ -547,6 +552,44 @@ impl MetadataRegion {
RegionDeleteRequest { rows }
}
/// Add logical regions to the metadata region.
pub async fn add_logical_regions(
&self,
physical_region_id: RegionId,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter = logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
.into_iter()
.chain(column_metadatas.into_iter().map(
move |(name, column_metadata)| {
(
MetadataRegion::concat_column_key(logical_region_id, name),
MetadataRegion::serialize_column_metadata(column_metadata),
)
},
))
})
.collect::<Vec<_>>();
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
self.mito
.handle_request(
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
Ok(())
}
}
#[cfg(test)]
@@ -724,31 +767,6 @@ mod test {
assert_eq!(result.unwrap(), Some(value));
}
#[tokio::test]
async fn test_add_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
// add one table
let logical_region_id = RegionId::new(196, 2333);
metadata_region
.add_logical_region(physical_region_id, logical_region_id)
.await
.unwrap();
assert!(metadata_region
.is_logical_region_exists(physical_region_id, logical_region_id)
.await
.unwrap());
// add it again
assert!(metadata_region
.add_logical_region(physical_region_id, logical_region_id)
.await
.is_err());
}
#[tokio::test]
async fn test_add_column() {
let env = TestEnv::new().await;
@@ -790,4 +808,95 @@ mod test {
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
}
fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
HashMap::from([
(
"label1".to_string(),
ColumnMetadata {
column_schema: ColumnSchema::new(
"label1".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 5,
},
),
(
"label2".to_string(),
ColumnMetadata {
column_schema: ColumnSchema::new(
"label2".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 5,
},
),
])
}
#[tokio::test]
async fn add_logical_regions_to_meta_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
let column_metadatas = test_column_metadatas();
let logical_region_id = RegionId::new(1024, 1);
let iter = vec![(
logical_region_id,
column_metadatas
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect::<HashMap<_, _>>(),
)];
metadata_region
.add_logical_regions(physical_region_id, iter.into_iter())
.await
.unwrap();
// Add logical region again.
let iter = vec![(
logical_region_id,
column_metadatas
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect::<HashMap<_, _>>(),
)];
metadata_region
.add_logical_regions(physical_region_id, iter.into_iter())
.await
.unwrap();
// Check if the logical region is added.
let logical_regions = metadata_region
.logical_regions(physical_region_id)
.await
.unwrap();
assert_eq!(logical_regions.len(), 2);
assert_eq!(logical_regions[1], logical_region_id);
// Check if the logical region exists.
let result = metadata_region
.exists(
physical_region_id,
&MetadataRegion::concat_region_key(logical_region_id),
)
.await
.unwrap();
assert!(result);
// Check if the logical region columns are added.
let logical_columns = metadata_region
.logical_columns(physical_region_id, logical_region_id)
.await
.unwrap()
.into_iter()
.collect::<HashMap<_, _>>();
assert_eq!(logical_columns.len(), 2);
assert_eq!(column_metadatas, logical_columns);
}
}