feat(metric-engine): introduce batch create request handling

This commit is contained in:
WenyXu
2025-02-08 11:16:13 +00:00
parent 764a57b80a
commit 76d69901ea
10 changed files with 528 additions and 20 deletions

View File

@@ -60,7 +60,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{
convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest,
RegionOpenRequest, RegionRequest, RegionRequestBundle,
RegionCreateRequest, RegionOpenRequest, RegionRequest, RegionRequestBundle,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -165,6 +165,13 @@ impl RegionServer {
self.inner.handle_batch_body(body).await
}
pub async fn handle_create_request(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
) -> Result<RegionResponse> {
self.inner.handle_batch_create_requests(requests).await
}
async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> {
let status = self
.inner
@@ -402,6 +409,49 @@ impl RegionServer {
extensions,
})
}
async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result<RegionResponse> {
match batch {
BatchRegionRequest::Create(creates) => self.handle_create_request(creates).await,
BatchRegionRequest::Drop(drops) => {
// FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
self.handle_requests(
drops
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
)
.await
}
BatchRegionRequest::Alter(alters) => {
// FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
self.handle_requests(
alters
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
)
.await
}
BatchRegionRequest::Put(put) => {
// TODO(yingwen, wenkang): Implement it in engine level.
self.handle_requests_parallel(
put.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Put(req))),
)
.await
}
BatchRegionRequest::Delete(delete) => {
// TODO(yingwen, wenkang): Implement it in engine level.
self.handle_requests_parallel(
delete
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Delete(req))),
)
.await
}
}
}
}
#[async_trait]

View File

@@ -133,12 +133,17 @@ impl RegionEngine for MetricEngine {
) -> Result<RegionResponse, BoxedError> {
match batch_request {
BatchRegionRequest::Create(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Create(req))),
)
.await
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.create_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionRequest::Alter(requests) => {
self.handle_requests(

View File

@@ -12,8 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
mod add_columns;
mod add_logical_regions;
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use add_columns::add_columns_to_physical_data_region;
use add_logical_regions::add_logical_regions_to_meta_region;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::{info, warn};
@@ -39,20 +46,62 @@ use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
use validate::validate_create_logical_regions;
use crate::engine::create::extract_new_columns::extract_new_columns;
use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{
AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu,
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
InvalidMetadataSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
Result, SerializeColumnMetadataSnafu,
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, EmptyRequestSnafu,
InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
MitoReadOperationSnafu, MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu,
PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils::{to_data_region_id, to_metadata_region_id};
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
impl MetricEngineInner {
pub async fn create_regions(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
if requests.is_empty() {
return Ok(0);
}
for (_, request) in requests.iter() {
Self::verify_region_create_request(request)?;
}
let first_request = &requests.first().unwrap().1;
if first_request.is_physical_table() {
for (region_id, request) in requests {
self.create_physical_region(region_id, request).await?;
}
return Ok(0);
} else if first_request
.options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
let physical_region_id = self.create_logical_regions(requests).await?;
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
);
} else {
return MissingRegionOptionSnafu {}.fail();
}
Ok(0)
}
/// Dispatch region creation request to physical region creation or logical
pub async fn create_region(
&self,
@@ -144,6 +193,113 @@ impl MetricEngineInner {
Ok(())
}
/// Create multiple logical regions on the same physical region.
///
/// Returns the physical region id of the created logical regions.
async fn create_logical_regions(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
// Filters out the requests that the logical region already exists
let requests = {
let state = self.state.read().unwrap();
let logical_region_exists = state.logical_region_exists_filter(data_region_id);
// TODO(weny): log the skipped logical regions
requests
.into_iter()
.filter(|(region_id, _)| !logical_region_exists(region_id))
.collect::<Vec<_>>()
};
// Finds new columns to add to physical region
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();
let index_option = {
let state = &self.state.read().unwrap();
let region_state = state
.physical_region_states()
.get(&data_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?;
let physical_columns = region_state.physical_columns();
extract_new_columns(
&requests,
physical_columns,
&mut new_column_names,
&mut new_columns,
)?;
region_state.options().index
};
// TODO(weny): we dont need to pass a mutable new_columns here.
add_columns_to_physical_data_region(
data_region_id,
index_option,
&mut new_columns,
&self.data_region,
)
.await?;
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
let logical_regions = requests
.iter()
.map(|(region_id, _)| (*region_id))
.collect::<Vec<_>>();
let logical_regions_column_names = requests.iter().map(|(region_id, request)| {
(
*region_id,
request
.column_metadatas
.iter()
.map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(metadata.column_schema.name.as_str(), column_metadata)
})
.collect::<HashMap<_, _>>(),
)
});
let new_add_columns = new_columns.iter().map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(
metadata.column_schema.name.to_string(),
column_metadata.column_id,
)
});
// Writes logical regions metadata to metadata region
add_logical_regions_to_meta_region(
&self.metadata_region,
physical_region_id,
logical_regions_column_names,
)
.await?;
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
Ok(data_region_id)
}
/// Create a logical region.
///
/// Physical table and logical table can have multiple regions, and their
@@ -293,16 +449,16 @@ impl MetricEngineInner {
new_columns: &mut [ColumnMetadata],
index_options: IndexOptions,
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
use crate::engine::IndexOptions;
use crate::error::Result;
use crate::metrics::PHYSICAL_COLUMN_COUNT;
/// Add new columns to the physical data region.
pub(crate) async fn add_columns_to_physical_data_region(
data_region_id: RegionId,
index_options: IndexOptions,
new_columns: &mut [ColumnMetadata],
data_region: &DataRegion,
) -> Result<()> {
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
Ok(())
}

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::storage::RegionId;
use crate::error::{MitoWriteOperationSnafu, Result};
use crate::metadata_region::MetadataRegion;
use crate::utils;
/// Add logical regions to the metadata region.
pub async fn add_logical_regions_to_meta_region(
metadata_region: &MetadataRegion,
physical_region_id: RegionId,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter = logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
.into_iter()
.chain(column_metadatas.into_iter().map(
move |(name, column_metadata)| {
(
MetadataRegion::concat_column_key(logical_region_id, name),
MetadataRegion::serialize_column_metadata(column_metadata),
)
},
))
})
.collect::<Vec<_>>();
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
metadata_region
.mito
.handle_request(
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
Ok(())
}

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::{ColumnId, RegionId};
use crate::error::{AddingFieldColumnSnafu, Result};
/// Extract new columns from the create requests.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionCreateRequest)],
physical_columns: &HashMap<String, ColumnId>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
for (_, request) in requests {
for col in &request.column_metadatas {
if !physical_columns.contains_key(&col.column_schema.name)
&& !new_column_names.contains(&col.column_schema.name.as_str())
{
ensure!(
col.semantic_type != SemanticType::Field,
AddingFieldColumnSnafu {
name: col.column_schema.name.to_string(),
}
);
new_column_names.insert(&col.column_schema.name);
// TODO(weny): avoid clone
new_columns.push(col.clone());
}
}
}
Ok(())
}

View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ensure, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{
ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu,
Result,
};
/// Validate the create logical regions request.
///
/// Returns extracted physical region id from the first request.
pub fn validate_create_logical_regions(
requests: &[(RegionId, RegionCreateRequest)],
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
let (_, request) = requests.first().unwrap();
let first_physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_region_id: RegionId = first_physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: first_physical_region_id_raw,
})?
.into();
// TODO(weny): Can we remove the check?
for (_, request) in requests.iter().skip(1) {
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
ensure!(
physical_region_id_raw == first_physical_region_id_raw,
ConflictRegionOptionSnafu {}
);
}
Ok(physical_region_id)
}

View File

@@ -83,6 +83,18 @@ pub(crate) struct MetricEngineState {
}
impl MetricEngineState {
pub fn logical_region_exists_filter(
&self,
physical_region_id: RegionId,
) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> {
let state = self
.physical_region_states()
.get(&physical_region_id)
.unwrap();
move |logical_region_id| state.logical_regions().contains(logical_region_id)
}
pub fn add_physical_region(
&mut self,
physical_region_id: RegionId,
@@ -111,6 +123,22 @@ impl MetricEngineState {
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_regions(
&mut self,
physical_region_id: RegionId,
logical_region_ids: impl IntoIterator<Item = RegionId>,
) {
let physical_region_id = to_data_region_id(physical_region_id);
let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
for logical_region_id in logical_region_ids {
state.logical_regions.insert(logical_region_id);
self.logical_regions
.insert(logical_region_id, physical_region_id);
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_region(

View File

@@ -246,6 +246,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty request"))]
EmptyRequest {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -263,7 +269,8 @@ impl ErrorExt for Error {
| MultipleFieldColumn { .. }
| NoFieldColumn { .. }
| AddingFieldColumn { .. }
| ParseRegionOptions { .. } => StatusCode::InvalidArguments,
| ParseRegionOptions { .. }
| EmptyRequest { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported

View File

@@ -57,7 +57,7 @@ const COLUMN_PREFIX: &str = "__column_";
/// table id + region sequence. This handler will transform the region group by
/// itself.
pub struct MetadataRegion {
mito: MitoEngine,
pub(crate) mito: MitoEngine,
/// Logical lock for operations that need to be serialized. Like update & read region columns.
///
/// Region entry will be registered on creating and opening logical region, and deregistered on
@@ -474,6 +474,52 @@ impl MetadataRegion {
}
}
pub(crate) fn build_put_request_from_iter(
kv: impl Iterator<Item = (String, String)>,
) -> RegionPutRequest {
let cols = vec![
ColumnSchema {
column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Tag as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Field as _,
..Default::default()
},
];
let rows = Rows {
schema: cols,
rows: kv
.into_iter()
.map(|(key, value)| Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue(key)),
},
Value {
value_data: Some(ValueData::StringValue(value)),
},
],
})
.collect(),
};
RegionPutRequest { rows, hint: None }
}
fn build_put_request(key: &str, value: &str) -> RegionPutRequest {
let cols = vec![
ColumnSchema {