feat(metric-engine): introduce batch alter request handling

This commit is contained in:
WenyXu
2025-02-10 11:44:58 +00:00
parent 76d69901ea
commit 99baa86b6a
10 changed files with 305 additions and 103 deletions

View File

@@ -60,7 +60,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{
convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest,
RegionCreateRequest, RegionOpenRequest, RegionRequest, RegionRequestBundle,
RegionOpenRequest, RegionRequest, RegionRequestBundle,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -165,11 +165,11 @@ impl RegionServer {
self.inner.handle_batch_body(body).await
}
pub async fn handle_create_request(
pub async fn handle_batch_request(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
batch_request: BatchRegionRequest,
) -> Result<RegionResponse> {
self.inner.handle_batch_create_requests(requests).await
self.inner.handle_batch_request(batch_request).await
}
async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> {
@@ -372,10 +372,6 @@ impl RegionServer {
self.handle_request(region_id, request).trace(span).await
}
async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result<RegionResponse> {
self.inner.handle_batch_request(batch).await
}
async fn handle_vector_request(
&self,
requests: Vec<(RegionId, RegionRequest)>,
@@ -409,49 +405,6 @@ impl RegionServer {
extensions,
})
}
async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result<RegionResponse> {
match batch {
BatchRegionRequest::Create(creates) => self.handle_create_request(creates).await,
BatchRegionRequest::Drop(drops) => {
// FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
self.handle_requests(
drops
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
)
.await
}
BatchRegionRequest::Alter(alters) => {
// FIXME(jeremy, ruihang, wenkang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
self.handle_requests(
alters
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
)
.await
}
BatchRegionRequest::Put(put) => {
// TODO(yingwen, wenkang): Implement it in engine level.
self.handle_requests_parallel(
put.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Put(req))),
)
.await
}
BatchRegionRequest::Delete(delete) => {
// TODO(yingwen, wenkang): Implement it in engine level.
self.handle_requests_parallel(
delete
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Delete(req))),
)
.await
}
}
}
}
#[async_trait]

View File

@@ -146,12 +146,17 @@ impl RegionEngine for MetricEngine {
})
}
BatchRegionRequest::Alter(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
)
.await
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.alter_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionRequest::Drop(requests) => {
self.handle_requests(

View File

@@ -12,15 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use common_telemetry::error;
use extract_new_columns::extract_new_columns;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use validate::validate_alter_region_requests;
use crate::engine::create::{
add_columns_to_physical_data_region, add_logical_regions_to_meta_region,
};
use crate::engine::MetricEngineInner;
use crate::error::{
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
@@ -28,6 +36,137 @@ use crate::error::{
use crate::utils::{to_data_region_id, to_metadata_region_id};
impl MetricEngineInner {
pub async fn alter_regions(
&self,
requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
if requests.is_empty() {
return Ok(0);
}
let first_region_id = &requests.first().unwrap().0;
if self.is_physical_region(*first_region_id) {
for (region_id, request) in requests {
self.alter_physical_region(region_id, request).await?;
}
} else {
self.alter_logical_regions(requests, extension_return_value)
.await?;
}
Ok(0)
}
/// Alter multiple logical regions on the same physical region.
pub async fn alter_logical_regions(
&self,
requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
validate_alter_region_requests(&requests)?;
let first_logical_region_id = requests[0].0;
// Finds new columns to add
let mut new_column_names = HashSet::new();
let mut new_columns_to_add = vec![];
let (physical_region_id, index_options) = {
let state = &self.state.read().unwrap();
let physical_region_id = state
.get_physical_region_id(first_logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {first_logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: first_logical_region_id,
}
})?;
let region_state = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
let physical_columns = region_state.physical_columns();
extract_new_columns(
&requests,
physical_columns,
&mut new_column_names,
&mut new_columns_to_add,
)?;
(physical_region_id, region_state.options().index)
};
let data_region_id = to_data_region_id(physical_region_id);
add_columns_to_physical_data_region(
data_region_id,
index_options,
&mut new_columns_to_add,
&self.data_region,
)
.await?;
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
let logical_region_columns = requests.iter().map(|(region_id, request)| {
let AlterKind::AddColumns { columns } = &request.kind else {
unreachable!()
};
(
*region_id,
columns
.iter()
.flat_map(|col| {
let column_name = col.column_metadata.column_schema.name.as_str();
if new_column_names.contains(column_name) {
let column_metadata = *physical_schema_map.get(column_name).unwrap();
Some((column_metadata.column_schema.name.as_str(), column_metadata))
} else {
None
}
})
.collect::<HashMap<_, _>>(),
)
});
let new_add_columns = new_columns_to_add.iter().map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(
metadata.column_schema.name.to_string(),
column_metadata.column_id,
)
});
// Writes logical regions metadata to metadata region
add_logical_regions_to_meta_region(
&self.metadata_region,
physical_region_id,
false,
logical_region_columns,
)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
);
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
Ok(0)
}
/// Dispatch region alter request
pub async fn alter_region(
&self,

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::{ColumnId, RegionId};
use crate::error::{AddingFieldColumnSnafu, Result};
/// Extract new columns from the create requests.
///
/// # Panics
///
/// This function will panic if the alter kind is not `AddColumns`.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionAlterRequest)],
physical_columns: &HashMap<String, ColumnId>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
for (_, request) in requests {
let AlterKind::AddColumns { columns } = &request.kind else {
unreachable!()
};
for col in columns {
let column_name = col.column_metadata.column_schema.name.as_str();
if !physical_columns.contains_key(column_name)
&& !new_column_names.contains(column_name)
{
ensure!(
col.column_metadata.semantic_type != SemanticType::Field,
AddingFieldColumnSnafu { name: column_name }
);
new_column_names.insert(column_name);
// TODO(weny): avoid clone
new_columns.push(col.column_metadata.clone());
}
}
}
Ok(())
}

View File

@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ensure;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use crate::error::{Result, UnsupportedAlterKindSnafu};
/// Validate the alter region requests.
pub fn validate_alter_region_requests(requests: &[(RegionId, RegionAlterRequest)]) -> Result<()> {
for (_, request) in requests {
ensure!(
matches!(request.kind, AlterKind::AddColumns { .. }),
UnsupportedAlterKindSnafu {
kind: request.kind.as_ref()
}
);
}
Ok(())
}

View File

@@ -19,8 +19,8 @@ mod validate;
use std::collections::{HashMap, HashSet};
use add_columns::add_columns_to_physical_data_region;
use add_logical_regions::add_logical_regions_to_meta_region;
pub(crate) use add_columns::add_columns_to_physical_data_region;
pub(crate) use add_logical_regions::add_logical_regions_to_meta_region;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::{info, warn};
@@ -53,10 +53,10 @@ use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegi
use crate::engine::MetricEngineInner;
use crate::error::{
AddingFieldColumnSnafu, ColumnNotFoundSnafu, ColumnTypeMismatchSnafu,
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, EmptyRequestSnafu,
InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
MitoReadOperationSnafu, MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu,
PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
InvalidMetadataSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu,
MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
Result, SerializeColumnMetadataSnafu,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
@@ -85,16 +85,8 @@ impl MetricEngineInner {
.options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
let physical_region_id = self.create_logical_regions(requests).await?;
let physical_columns = self
.data_region
.physical_columns(physical_region_id)
self.create_logical_regions(requests, extension_return_value)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns)
.context(SerializeColumnMetadataSnafu)?,
);
} else {
return MissingRegionOptionSnafu {}.fail();
}
@@ -194,14 +186,11 @@ impl MetricEngineInner {
}
/// Create multiple logical regions on the same physical region.
///
/// Returns the physical region id of the created logical regions.
async fn create_logical_regions(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
@@ -257,7 +246,7 @@ impl MetricEngineInner {
.iter()
.map(|(region_id, _)| (*region_id))
.collect::<Vec<_>>();
let logical_regions_column_names = requests.iter().map(|(region_id, request)| {
let logical_region_columns = requests.iter().map(|(region_id, request)| {
(
*region_id,
request
@@ -285,11 +274,17 @@ impl MetricEngineInner {
)
});
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
);
// Writes logical regions metadata to metadata region
add_logical_regions_to_meta_region(
&self.metadata_region,
physical_region_id,
logical_regions_column_names,
true,
logical_region_columns,
)
.await?;
@@ -297,7 +292,7 @@ impl MetricEngineInner {
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
Ok(data_region_id)
Ok(())
}
/// Create a logical region.

View File

@@ -27,27 +27,33 @@ use crate::utils;
pub async fn add_logical_regions_to_meta_region(
metadata_region: &MetadataRegion,
physical_region_id: RegionId,
write_region_id: bool,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter = logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
let iter =
logical_regions
.into_iter()
.chain(column_metadatas.into_iter().map(
move |(name, column_metadata)| {
(
MetadataRegion::concat_column_key(logical_region_id, name),
MetadataRegion::serialize_column_metadata(column_metadata),
)
},
))
})
.collect::<Vec<_>>();
.flat_map(|(logical_region_id, column_metadatas)| {
if write_region_id {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
} else {
None
}
.into_iter()
.chain(column_metadatas.into_iter().map(
move |(name, column_metadata)| {
(
MetadataRegion::concat_column_key(logical_region_id, name),
MetadataRegion::serialize_column_metadata(column_metadata),
)
},
))
})
.collect::<Vec<_>>();
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
metadata_region

View File

@@ -18,8 +18,7 @@ use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{
ConflictRegionOptionSnafu, EmptyRequestSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu,
Result,
ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result,
};
/// Validate the create logical regions request.
@@ -28,8 +27,6 @@ use crate::error::{
pub fn validate_create_logical_regions(
requests: &[(RegionId, RegionCreateRequest)],
) -> Result<RegionId> {
ensure!(!requests.is_empty(), EmptyRequestSnafu {});
let (_, request) = requests.first().unwrap();
let first_physical_region_id_raw = request
.options

View File

@@ -139,6 +139,15 @@ impl MetricEngineState {
}
}
pub fn invalid_logical_regions_cache(
&mut self,
logical_region_ids: impl IntoIterator<Item = RegionId>,
) {
for logical_region_id in logical_region_ids {
self.logical_columns.remove(&logical_region_id);
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_region(

View File

@@ -219,6 +219,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unsupported alter kind: {}", kind))]
UnsupportedAlterKind {
kind: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Multiple field column found: {} and {}", previous, current))]
MultipleFieldColumn {
previous: String,
@@ -270,7 +277,8 @@ impl ErrorExt for Error {
| NoFieldColumn { .. }
| AddingFieldColumn { .. }
| ParseRegionOptions { .. }
| EmptyRequest { .. } => StatusCode::InvalidArguments,
| EmptyRequest { .. }
| UnsupportedAlterKind { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported