mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 08:20:36 +00:00
refactor(meta): separate validation and execution logic in alter logical tables procedure (#6478)
* refactor(meta): separate validation and execution logic in alter logical tables procedure Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -241,7 +241,6 @@ impl RepairTool {
|
||||
let alter_table_request = alter_table::make_alter_region_request_for_peer(
|
||||
logical_table_id,
|
||||
&alter_table_expr,
|
||||
full_table_metadata.table_info.ident.version,
|
||||
peer,
|
||||
physical_region_routes,
|
||||
)?;
|
||||
|
||||
@@ -66,7 +66,6 @@ pub fn generate_alter_table_expr_for_all_columns(
|
||||
pub fn make_alter_region_request_for_peer(
|
||||
logical_table_id: TableId,
|
||||
alter_table_expr: &AlterTableExpr,
|
||||
schema_version: u64,
|
||||
peer: &Peer,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<RegionRequest> {
|
||||
@@ -74,7 +73,7 @@ pub fn make_alter_region_request_for_peer(
|
||||
let mut requests = Vec::with_capacity(regions_on_this_peer.len());
|
||||
for region_number in ®ions_on_this_peer {
|
||||
let region_id = RegionId::new(logical_table_id, *region_number);
|
||||
let request = make_alter_region_request(region_id, alter_table_expr, schema_version);
|
||||
let request = make_alter_region_request(region_id, alter_table_expr);
|
||||
requests.push(request);
|
||||
}
|
||||
|
||||
|
||||
@@ -12,20 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod check;
|
||||
mod metadata;
|
||||
mod region_request;
|
||||
mod table_cache_keys;
|
||||
mod executor;
|
||||
mod update_metadata;
|
||||
mod validator;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
|
||||
use common_procedure::{Context, LockKey, Procedure, Status};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures_util::future;
|
||||
pub use region_request::make_alter_region_request;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
pub use executor::make_alter_region_request;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
@@ -33,10 +30,12 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::ddl::utils::{
|
||||
add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error,
|
||||
sync_follower_regions,
|
||||
use crate::cache_invalidator::Context as CacheContext;
|
||||
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
|
||||
use crate::ddl::alter_logical_tables::validator::{
|
||||
retain_unskipped, AlterLogicalTableValidator, ValidatorResult,
|
||||
};
|
||||
use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::Result;
|
||||
use crate::instruction::CacheIdent;
|
||||
@@ -46,13 +45,38 @@ use crate::key::DeserializedValueWithBytes;
|
||||
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
|
||||
use crate::metrics;
|
||||
use crate::rpc::ddl::AlterTableTask;
|
||||
use crate::rpc::router::{find_leaders, RegionRoute};
|
||||
use crate::rpc::router::RegionRoute;
|
||||
|
||||
pub struct AlterLogicalTablesProcedure {
|
||||
pub context: DdlContext,
|
||||
pub data: AlterTablesData,
|
||||
}
|
||||
|
||||
/// Builds the validator from the [`AlterTablesData`].
|
||||
fn build_validator_from_alter_table_data<'a>(
|
||||
data: &'a AlterTablesData,
|
||||
) -> AlterLogicalTableValidator<'a> {
|
||||
let phsycial_table_id = data.physical_table_id;
|
||||
let alters = data
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|task| &task.alter_table)
|
||||
.collect::<Vec<_>>();
|
||||
AlterLogicalTableValidator::new(phsycial_table_id, alters)
|
||||
}
|
||||
|
||||
/// Builds the executor from the [`AlterTablesData`].
|
||||
fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> {
|
||||
debug_assert_eq!(data.tasks.len(), data.table_info_values.len());
|
||||
let alters = data
|
||||
.tasks
|
||||
.iter()
|
||||
.zip(data.table_info_values.iter())
|
||||
.map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table))
|
||||
.collect::<Vec<_>>();
|
||||
AlterLogicalTablesExecutor::new(alters)
|
||||
}
|
||||
|
||||
impl AlterLogicalTablesProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
|
||||
|
||||
@@ -82,35 +106,44 @@ impl AlterLogicalTablesProcedure {
|
||||
}
|
||||
|
||||
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
|
||||
// Checks all the tasks
|
||||
self.check_input_tasks()?;
|
||||
// Fills the table info values
|
||||
self.fill_table_info_values().await?;
|
||||
// Checks the physical table, must after [fill_table_info_values]
|
||||
self.check_physical_table().await?;
|
||||
// Fills the physical table info
|
||||
self.fill_physical_table_info().await?;
|
||||
// Filter the finished tasks
|
||||
let finished_tasks = self.check_finished_tasks()?;
|
||||
let already_finished_count = finished_tasks
|
||||
.iter()
|
||||
.map(|x| if *x { 1 } else { 0 })
|
||||
.sum::<usize>();
|
||||
let apply_tasks_count = self.data.tasks.len();
|
||||
if already_finished_count == apply_tasks_count {
|
||||
let validator = build_validator_from_alter_table_data(&self.data);
|
||||
let ValidatorResult {
|
||||
num_skipped,
|
||||
skip_alter,
|
||||
table_info_values,
|
||||
physical_table_info,
|
||||
physical_table_route,
|
||||
} = validator
|
||||
.validate(&self.context.table_metadata_manager)
|
||||
.await?;
|
||||
|
||||
let num_tasks = self.data.tasks.len();
|
||||
if num_skipped == num_tasks {
|
||||
info!("All the alter tasks are finished, will skip the procedure.");
|
||||
let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
|
||||
&physical_table_info,
|
||||
&table_info_values
|
||||
.iter()
|
||||
.map(|v| v.get_inner_ref())
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
self.data.table_cache_keys_to_invalidate = cache_ident_keys;
|
||||
// Re-invalidate the table cache
|
||||
self.data.state = AlterTablesState::InvalidateTableCache;
|
||||
return Ok(Status::executing(true));
|
||||
} else if already_finished_count > 0 {
|
||||
} else if num_skipped > 0 {
|
||||
info!(
|
||||
"There are {} alter tasks, {} of them were already finished.",
|
||||
apply_tasks_count, already_finished_count
|
||||
num_tasks, num_skipped
|
||||
);
|
||||
}
|
||||
self.filter_task(&finished_tasks)?;
|
||||
|
||||
// Next state
|
||||
// Updates the procedure state.
|
||||
retain_unskipped(&mut self.data.tasks, &skip_alter);
|
||||
self.data.physical_table_info = Some(physical_table_info);
|
||||
self.data.physical_table_route = Some(physical_table_route);
|
||||
self.data.table_info_values = table_info_values;
|
||||
debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
|
||||
self.data.state = AlterTablesState::SubmitAlterRegionRequests;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -118,25 +151,13 @@ impl AlterLogicalTablesProcedure {
|
||||
pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
|
||||
// Safety: we have checked the state in on_prepare
|
||||
let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
|
||||
let leaders = find_leaders(&physical_table_route.region_routes);
|
||||
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
|
||||
|
||||
for peer in leaders {
|
||||
let requester = self.context.node_manager.datanode(&peer).await;
|
||||
let request = self.make_request(&peer, &physical_table_route.region_routes)?;
|
||||
|
||||
alter_region_tasks.push(async move {
|
||||
requester
|
||||
.handle(request)
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(peer))
|
||||
});
|
||||
}
|
||||
|
||||
let mut results = future::join_all(alter_region_tasks)
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let executor = build_executor_from_alter_expr(&self.data);
|
||||
let mut results = executor
|
||||
.on_alter_regions(
|
||||
&self.context.node_manager,
|
||||
&physical_table_route.region_routes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(column_metadatas) =
|
||||
extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
|
||||
@@ -177,7 +198,18 @@ impl AlterLogicalTablesProcedure {
|
||||
self.update_physical_table_metadata().await?;
|
||||
self.update_logical_tables_metadata().await?;
|
||||
|
||||
self.data.build_cache_keys_to_invalidate();
|
||||
let logical_table_info_values = self
|
||||
.data
|
||||
.table_info_values
|
||||
.iter()
|
||||
.map(|v| v.get_inner_ref())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
|
||||
self.data.physical_table_info.as_ref().unwrap(),
|
||||
&logical_table_info_values,
|
||||
);
|
||||
self.data.table_cache_keys_to_invalidate = cache_ident_keys;
|
||||
self.data.clear_metadata_fields();
|
||||
|
||||
self.data.state = AlterTablesState::InvalidateTableCache;
|
||||
@@ -187,9 +219,16 @@ impl AlterLogicalTablesProcedure {
|
||||
pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
|
||||
let to_invalidate = &self.data.table_cache_keys_to_invalidate;
|
||||
|
||||
let ctx = CacheContext {
|
||||
subject: Some(format!(
|
||||
"Invalidate table cache by altering logical tables, physical_table_id: {}",
|
||||
self.data.physical_table_id,
|
||||
)),
|
||||
};
|
||||
|
||||
self.context
|
||||
.cache_invalidator
|
||||
.invalidate(&Default::default(), to_invalidate)
|
||||
.invalidate(&ctx, to_invalidate)
|
||||
.await?;
|
||||
Ok(Status::done())
|
||||
}
|
||||
@@ -209,6 +248,10 @@ impl Procedure for AlterLogicalTablesProcedure {
|
||||
let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
|
||||
.with_label_values(&[step])
|
||||
.start_timer();
|
||||
debug!(
|
||||
"Executing alter logical tables procedure, state: {:?}",
|
||||
state
|
||||
);
|
||||
|
||||
match state {
|
||||
AlterTablesState::Prepare => self.on_prepare().await,
|
||||
|
||||
@@ -1,136 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use snafu::{ensure, OptionExt};
|
||||
|
||||
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
|
||||
use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::rpc::ddl::AlterTableTask;
|
||||
|
||||
impl AlterLogicalTablesProcedure {
|
||||
pub(crate) fn check_input_tasks(&self) -> Result<()> {
|
||||
self.check_schema()?;
|
||||
self.check_alter_kind()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn check_physical_table(&self) -> Result<()> {
|
||||
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
|
||||
let table_ids = self
|
||||
.data
|
||||
.table_info_values
|
||||
.iter()
|
||||
.map(|v| v.table_info.ident.table_id)
|
||||
.collect::<Vec<_>>();
|
||||
let table_routes = table_route_manager
|
||||
.table_route_storage()
|
||||
.batch_get(&table_ids)
|
||||
.await?;
|
||||
let physical_table_id = self.data.physical_table_id;
|
||||
let is_same_physical_table = table_routes.iter().all(|r| {
|
||||
if let Some(TableRouteValue::Logical(r)) = r {
|
||||
r.physical_table_id() == physical_table_id
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
ensure!(
|
||||
is_same_physical_table,
|
||||
AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "All the tasks should have the same physical table id"
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn check_finished_tasks(&self) -> Result<Vec<bool>> {
|
||||
let task = &self.data.tasks;
|
||||
let table_info_values = &self.data.table_info_values;
|
||||
|
||||
Ok(task
|
||||
.iter()
|
||||
.zip(table_info_values.iter())
|
||||
.map(|(task, table)| Self::check_finished_task(task, table))
|
||||
.collect())
|
||||
}
|
||||
|
||||
// Checks if the schemas of the tasks are the same
|
||||
fn check_schema(&self) -> Result<()> {
|
||||
let is_same_schema = self.data.tasks.windows(2).all(|pair| {
|
||||
pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name
|
||||
&& pair[0].alter_table.schema_name == pair[1].alter_table.schema_name
|
||||
});
|
||||
|
||||
ensure!(
|
||||
is_same_schema,
|
||||
AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "Schemas of the tasks are not the same"
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_alter_kind(&self) -> Result<()> {
|
||||
for task in &self.data.tasks {
|
||||
let kind = task.alter_table.kind.as_ref().context(
|
||||
AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "Alter kind is missing",
|
||||
},
|
||||
)?;
|
||||
let Kind::AddColumns(_) = kind else {
|
||||
return AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "Only support add columns operation",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool {
|
||||
let columns = table
|
||||
.table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.map(|c| &c.name)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let Some(kind) = task.alter_table.kind.as_ref() else {
|
||||
return true; // Never get here since we have checked it in `check_alter_kind`
|
||||
};
|
||||
let Kind::AddColumns(add_columns) = kind else {
|
||||
return true; // Never get here since we have checked it in `check_alter_kind`
|
||||
};
|
||||
|
||||
// We only check that all columns have been finished. That is to say,
|
||||
// if one part is finished but another part is not, it will be considered
|
||||
// unfinished.
|
||||
add_columns
|
||||
.add_columns
|
||||
.iter()
|
||||
.map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
|
||||
.all(|column| column.map(|c| columns.contains(c)).unwrap_or(false))
|
||||
}
|
||||
}
|
||||
216
src/common/meta/src/ddl/alter_logical_tables/executor.rs
Normal file
216
src/common/meta/src/ddl/alter_logical_tables/executor.rs
Normal file
@@ -0,0 +1,216 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::region::{
|
||||
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
|
||||
RegionColumnDef, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::{self, AlterTableExpr};
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{debug, warn};
|
||||
use futures::future;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::storage::{RegionId, RegionNumber, TableId};
|
||||
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info};
|
||||
use crate::error::Result;
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef};
|
||||
use crate::node_manager::NodeManagerRef;
|
||||
use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
|
||||
|
||||
/// [AlterLogicalTablesExecutor] performs:
|
||||
/// - Alters logical regions on the datanodes.
|
||||
/// - Updates table metadata for alter table operation.
|
||||
pub struct AlterLogicalTablesExecutor<'a> {
|
||||
/// The alter table expressions.
|
||||
///
|
||||
/// The first element is the logical table id, the second element is the alter table expression.
|
||||
alters: Vec<(TableId, &'a AlterTableExpr)>,
|
||||
}
|
||||
|
||||
impl<'a> AlterLogicalTablesExecutor<'a> {
|
||||
pub fn new(alters: Vec<(TableId, &'a AlterTableExpr)>) -> Self {
|
||||
Self { alters }
|
||||
}
|
||||
|
||||
/// Alters logical regions on the datanodes.
|
||||
pub(crate) async fn on_alter_regions(
|
||||
&self,
|
||||
node_manager: &NodeManagerRef,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<Vec<RegionResponse>> {
|
||||
let region_distribution = region_distribution(region_routes);
|
||||
let leaders = find_leaders(region_routes)
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
|
||||
for (datanode_id, region_role_set) in region_distribution {
|
||||
if region_role_set.leader_regions.is_empty() {
|
||||
continue;
|
||||
}
|
||||
// Safety: must exists.
|
||||
let peer = leaders.get(&datanode_id).unwrap();
|
||||
let requester = node_manager.datanode(peer).await;
|
||||
let requests = self.make_alter_region_request(®ion_role_set.leader_regions);
|
||||
let requester = requester.clone();
|
||||
let peer = peer.clone();
|
||||
|
||||
debug!("Sending alter region requests to datanode {}", peer);
|
||||
alter_region_tasks.push(async move {
|
||||
requester
|
||||
.handle(make_request(requests))
|
||||
.await
|
||||
.map_err(add_peer_context_if_needed(peer))
|
||||
});
|
||||
}
|
||||
|
||||
future::join_all(alter_region_tasks)
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()
|
||||
}
|
||||
|
||||
fn make_alter_region_request(&self, region_numbers: &[RegionNumber]) -> AlterRequests {
|
||||
let mut requests = Vec::with_capacity(region_numbers.len() * self.alters.len());
|
||||
for (table_id, alter) in self.alters.iter() {
|
||||
for region_number in region_numbers {
|
||||
let region_id = RegionId::new(*table_id, *region_number);
|
||||
let request = make_alter_region_request(region_id, alter);
|
||||
requests.push(request);
|
||||
}
|
||||
}
|
||||
|
||||
AlterRequests { requests }
|
||||
}
|
||||
|
||||
/// Updates table metadata for alter table operation.
|
||||
///
|
||||
/// ## Panic:
|
||||
/// - If the region distribution is not set when updating table metadata.
|
||||
pub(crate) async fn on_alter_metadata(
|
||||
physical_table_id: TableId,
|
||||
table_metadata_manager: &TableMetadataManagerRef,
|
||||
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
|
||||
region_distribution: RegionDistribution,
|
||||
physical_columns: &[ColumnMetadata],
|
||||
) -> Result<()> {
|
||||
if physical_columns.is_empty() {
|
||||
warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let table_ref = current_table_info_value.table_ref();
|
||||
let table_id = physical_table_id;
|
||||
|
||||
// Generates new table info
|
||||
let old_raw_table_info = current_table_info_value.table_info.clone();
|
||||
let new_raw_table_info =
|
||||
raw_table_info::build_new_physical_table_info(old_raw_table_info, physical_columns);
|
||||
|
||||
debug!(
|
||||
"Starting update table: {} metadata, table_id: {}, new table info: {:?}",
|
||||
table_ref, table_id, new_raw_table_info
|
||||
);
|
||||
|
||||
table_metadata_manager
|
||||
.update_table_info(
|
||||
current_table_info_value,
|
||||
Some(region_distribution),
|
||||
new_raw_table_info,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Builds the cache ident keys for the alter logical tables.
|
||||
///
|
||||
/// The cache ident keys are:
|
||||
/// - The table id of the logical tables.
|
||||
/// - The table name of the logical tables.
|
||||
/// - The table id of the physical table.
|
||||
pub(crate) fn build_cache_ident_keys(
|
||||
physical_table_info: &TableInfoValue,
|
||||
logical_table_info_values: &[&TableInfoValue],
|
||||
) -> Vec<CacheIdent> {
|
||||
let mut cache_keys = Vec::with_capacity(logical_table_info_values.len() * 2 + 2);
|
||||
cache_keys.extend(logical_table_info_values.iter().flat_map(|table| {
|
||||
vec![
|
||||
CacheIdent::TableId(table.table_info.ident.table_id),
|
||||
CacheIdent::TableName(table.table_name()),
|
||||
]
|
||||
}));
|
||||
cache_keys.push(CacheIdent::TableId(
|
||||
physical_table_info.table_info.ident.table_id,
|
||||
));
|
||||
cache_keys.push(CacheIdent::TableName(physical_table_info.table_name()));
|
||||
|
||||
cache_keys
|
||||
}
|
||||
}
|
||||
|
||||
fn make_request(alter_requests: AlterRequests) -> RegionRequest {
|
||||
RegionRequest {
|
||||
header: Some(RegionRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
..Default::default()
|
||||
}),
|
||||
body: Some(region_request::Body::Alters(alter_requests)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Makes an alter region request.
|
||||
pub fn make_alter_region_request(
|
||||
region_id: RegionId,
|
||||
alter_table_expr: &AlterTableExpr,
|
||||
) -> AlterRequest {
|
||||
let region_id = region_id.as_u64();
|
||||
let kind = match &alter_table_expr.kind {
|
||||
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
|
||||
to_region_add_columns(add_columns),
|
||||
)),
|
||||
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
|
||||
};
|
||||
|
||||
AlterRequest {
|
||||
region_id,
|
||||
schema_version: 0,
|
||||
kind,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
|
||||
let add_columns = add_columns
|
||||
.add_columns
|
||||
.iter()
|
||||
.map(|add_column| {
|
||||
let region_column_def = RegionColumnDef {
|
||||
column_def: add_column.column_def.clone(),
|
||||
..Default::default() // other fields are not used in alter logical table
|
||||
};
|
||||
AddColumn {
|
||||
column_def: Some(region_column_def),
|
||||
..Default::default() // other fields are not used in alter logical table
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
AddColumns { add_columns }
|
||||
}
|
||||
@@ -1,158 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_catalog::format_full_table_name;
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
|
||||
use crate::error::{
|
||||
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu,
|
||||
TableRouteNotFoundSnafu,
|
||||
};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
use crate::rpc::ddl::AlterTableTask;
|
||||
|
||||
impl AlterLogicalTablesProcedure {
|
||||
pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> {
|
||||
debug_assert_eq!(finished_tasks.len(), self.data.tasks.len());
|
||||
debug_assert_eq!(finished_tasks.len(), self.data.table_info_values.len());
|
||||
self.data.tasks = self
|
||||
.data
|
||||
.tasks
|
||||
.drain(..)
|
||||
.zip(finished_tasks.iter())
|
||||
.filter_map(|(task, finished)| if *finished { None } else { Some(task) })
|
||||
.collect();
|
||||
self.data.table_info_values = self
|
||||
.data
|
||||
.table_info_values
|
||||
.drain(..)
|
||||
.zip(finished_tasks.iter())
|
||||
.filter_map(|(table_info_value, finished)| {
|
||||
if *finished {
|
||||
None
|
||||
} else {
|
||||
Some(table_info_value)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
|
||||
let (physical_table_info, physical_table_route) = self
|
||||
.context
|
||||
.table_metadata_manager
|
||||
.get_full_table_info(self.data.physical_table_id)
|
||||
.await?;
|
||||
|
||||
let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: format!("table id - {}", self.data.physical_table_id),
|
||||
})?;
|
||||
let physical_table_route = physical_table_route
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: self.data.physical_table_id,
|
||||
})?
|
||||
.into_inner();
|
||||
|
||||
self.data.physical_table_info = Some(physical_table_info);
|
||||
let TableRouteValue::Physical(physical_table_route) = physical_table_route else {
|
||||
return AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: format!(
|
||||
"expected a physical table but got a logical table: {:?}",
|
||||
self.data.physical_table_id
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
self.data.physical_table_route = Some(physical_table_route);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> {
|
||||
let table_ids = self.get_all_table_ids().await?;
|
||||
let table_info_values = self.get_all_table_info_values(&table_ids).await?;
|
||||
debug_assert_eq!(table_info_values.len(), self.data.tasks.len());
|
||||
self.data.table_info_values = table_info_values;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_all_table_info_values(
|
||||
&self,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
|
||||
let table_info_manager = self.context.table_metadata_manager.table_info_manager();
|
||||
let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
|
||||
let mut table_info_values = Vec::with_capacity(table_ids.len());
|
||||
for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) {
|
||||
let table_info_value =
|
||||
table_info_map
|
||||
.remove(table_id)
|
||||
.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: extract_table_name(task),
|
||||
})?;
|
||||
table_info_values.push(table_info_value);
|
||||
}
|
||||
|
||||
Ok(table_info_values)
|
||||
}
|
||||
|
||||
async fn get_all_table_ids(&self) -> Result<Vec<TableId>> {
|
||||
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
|
||||
let table_name_keys = self
|
||||
.data
|
||||
.tasks
|
||||
.iter()
|
||||
.map(|task| extract_table_name_key(task))
|
||||
.collect();
|
||||
|
||||
let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
|
||||
let mut table_ids = Vec::with_capacity(table_name_values.len());
|
||||
for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) {
|
||||
let table_id = value
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: extract_table_name(task),
|
||||
})?
|
||||
.table_id();
|
||||
table_ids.push(table_id);
|
||||
}
|
||||
|
||||
Ok(table_ids)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn extract_table_name(task: &AlterTableTask) -> String {
|
||||
format_full_table_name(
|
||||
&task.alter_table.catalog_name,
|
||||
&task.alter_table.schema_name,
|
||||
&task.alter_table.table_name,
|
||||
)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey {
|
||||
TableNameKey::new(
|
||||
&task.alter_table.catalog_name,
|
||||
&task.alter_table.schema_name,
|
||||
&task.alter_table.table_name,
|
||||
)
|
||||
}
|
||||
@@ -1,113 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::region::{
|
||||
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
|
||||
RegionColumnDef, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::{self, AlterTableExpr};
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
|
||||
use crate::error::Result;
|
||||
use crate::peer::Peer;
|
||||
use crate::rpc::router::{find_leader_regions, RegionRoute};
|
||||
|
||||
impl AlterLogicalTablesProcedure {
|
||||
pub(crate) fn make_request(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<RegionRequest> {
|
||||
let alter_requests = self.make_alter_region_requests(peer, region_routes)?;
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
..Default::default()
|
||||
}),
|
||||
body: Some(region_request::Body::Alters(alter_requests)),
|
||||
};
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
fn make_alter_region_requests(
|
||||
&self,
|
||||
peer: &Peer,
|
||||
region_routes: &[RegionRoute],
|
||||
) -> Result<AlterRequests> {
|
||||
let tasks = &self.data.tasks;
|
||||
let regions_on_this_peer = find_leader_regions(region_routes, peer);
|
||||
let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
|
||||
for (task, table) in self
|
||||
.data
|
||||
.tasks
|
||||
.iter()
|
||||
.zip(self.data.table_info_values.iter())
|
||||
{
|
||||
for region_number in ®ions_on_this_peer {
|
||||
let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
|
||||
let request = make_alter_region_request(
|
||||
region_id,
|
||||
&task.alter_table,
|
||||
table.table_info.ident.version,
|
||||
);
|
||||
requests.push(request);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AlterRequests { requests })
|
||||
}
|
||||
}
|
||||
|
||||
/// Makes an alter region request.
|
||||
pub fn make_alter_region_request(
|
||||
region_id: RegionId,
|
||||
alter_table_expr: &AlterTableExpr,
|
||||
schema_version: u64,
|
||||
) -> AlterRequest {
|
||||
let region_id = region_id.as_u64();
|
||||
let kind = match &alter_table_expr.kind {
|
||||
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
|
||||
to_region_add_columns(add_columns),
|
||||
)),
|
||||
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
|
||||
};
|
||||
|
||||
AlterRequest {
|
||||
region_id,
|
||||
schema_version,
|
||||
kind,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns {
|
||||
let add_columns = add_columns
|
||||
.add_columns
|
||||
.iter()
|
||||
.map(|add_column| {
|
||||
let region_column_def = RegionColumnDef {
|
||||
column_def: add_column.column_def.clone(),
|
||||
..Default::default() // other fields are not used in alter logical table
|
||||
};
|
||||
AddColumn {
|
||||
column_def: Some(region_column_def),
|
||||
..Default::default() // other fields are not used in alter logical table
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
AddColumns { add_columns }
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use table::metadata::RawTableInfo;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::ddl::alter_logical_tables::AlterTablesData;
|
||||
use crate::instruction::CacheIdent;
|
||||
|
||||
impl AlterTablesData {
|
||||
pub(crate) fn build_cache_keys_to_invalidate(&mut self) {
|
||||
let mut cache_keys = self
|
||||
.table_info_values
|
||||
.iter()
|
||||
.flat_map(|table| {
|
||||
vec![
|
||||
CacheIdent::TableId(table.table_info.ident.table_id),
|
||||
CacheIdent::TableName(extract_table_name(&table.table_info)),
|
||||
]
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
cache_keys.push(CacheIdent::TableId(self.physical_table_id));
|
||||
// Safety: physical_table_info already filled in previous steps
|
||||
let physical_table_info = &self.physical_table_info.as_ref().unwrap().table_info;
|
||||
cache_keys.push(CacheIdent::TableName(extract_table_name(
|
||||
physical_table_info,
|
||||
)));
|
||||
|
||||
self.table_cache_keys_to_invalidate = cache_keys;
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_table_name(table_info: &RawTableInfo) -> TableName {
|
||||
TableName::new(
|
||||
&table_info.catalog_name,
|
||||
&table_info.schema_name,
|
||||
&table_info.name,
|
||||
)
|
||||
}
|
||||
@@ -13,13 +13,12 @@
|
||||
// limitations under the License.
|
||||
|
||||
use common_grpc_expr::alter_expr_to_request;
|
||||
use common_telemetry::warn;
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::{RawTableInfo, TableInfo};
|
||||
|
||||
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
|
||||
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
|
||||
use crate::ddl::utils::raw_table_info;
|
||||
use crate::error;
|
||||
use crate::error::{ConvertAlterTableRequestSnafu, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -29,32 +28,20 @@ use crate::rpc::router::region_distribution;
|
||||
|
||||
impl AlterLogicalTablesProcedure {
|
||||
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
|
||||
if self.data.physical_columns.is_empty() {
|
||||
warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Safety: must exist.
|
||||
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
|
||||
let physical_table_route = self.data.physical_table_route.as_ref().unwrap();
|
||||
let region_distribution = region_distribution(&physical_table_route.region_routes);
|
||||
|
||||
// Generates new table info
|
||||
let old_raw_table_info = physical_table_info.table_info.clone();
|
||||
let new_raw_table_info = raw_table_info::build_new_physical_table_info(
|
||||
old_raw_table_info,
|
||||
&self.data.physical_columns,
|
||||
);
|
||||
|
||||
// Updates physical table's metadata.
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_info(
|
||||
physical_table_info,
|
||||
Some(region_distribution),
|
||||
new_raw_table_info,
|
||||
)
|
||||
.await?;
|
||||
AlterLogicalTablesExecutor::on_alter_metadata(
|
||||
self.data.physical_table_id,
|
||||
&self.context.table_metadata_manager,
|
||||
physical_table_info,
|
||||
region_distribution,
|
||||
&self.data.physical_columns,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
284
src/common/meta/src/ddl/alter_logical_tables/validator.rs
Normal file
284
src/common/meta/src/ddl/alter_logical_tables/validator.rs
Normal file
@@ -0,0 +1,284 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::AlterTableExpr;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::storage::TableId;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
|
||||
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
|
||||
use crate::error::{
|
||||
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
|
||||
TableRouteNotFoundSnafu,
|
||||
};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteManager, TableRouteValue};
|
||||
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
|
||||
/// [AlterLogicalTableValidator] validates the alter logical expressions.
|
||||
pub struct AlterLogicalTableValidator<'a> {
|
||||
physical_table_id: TableId,
|
||||
alters: Vec<&'a AlterTableExpr>,
|
||||
}
|
||||
|
||||
impl<'a> AlterLogicalTableValidator<'a> {
|
||||
pub fn new(physical_table_id: TableId, alters: Vec<&'a AlterTableExpr>) -> Self {
|
||||
Self {
|
||||
physical_table_id,
|
||||
alters,
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates all alter table expressions have the same schema and catalog.
|
||||
fn validate_schema(&self) -> Result<()> {
|
||||
let is_same_schema = self.alters.windows(2).all(|pair| {
|
||||
pair[0].catalog_name == pair[1].catalog_name
|
||||
&& pair[0].schema_name == pair[1].schema_name
|
||||
});
|
||||
|
||||
ensure!(
|
||||
is_same_schema,
|
||||
AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "Schemas of the alter table expressions are not the same"
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates that all alter table expressions are of the supported kind.
|
||||
/// Currently only supports `AddColumns` operations.
|
||||
fn validate_alter_kind(&self) -> Result<()> {
|
||||
for alter in &self.alters {
|
||||
let kind = alter
|
||||
.kind
|
||||
.as_ref()
|
||||
.context(AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "Alter kind is missing",
|
||||
})?;
|
||||
|
||||
let Kind::AddColumns(_) = kind else {
|
||||
return AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "Only support add columns operation",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn table_names(&self) -> Vec<TableReference> {
|
||||
self.alters
|
||||
.iter()
|
||||
.map(|alter| {
|
||||
TableReference::full(&alter.catalog_name, &alter.schema_name, &alter.table_name)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Validates that the physical table info and route exist.
|
||||
///
|
||||
/// This method performs the following validations:
|
||||
/// 1. Retrieves the full table info and route for the given physical table id
|
||||
/// 2. Ensures the table info and table route exists
|
||||
/// 3. Verifies that the table route is actually a physical table route, not a logical one
|
||||
///
|
||||
/// Returns a tuple containing the validated table info and physical table route.
|
||||
async fn validate_physical_table(
|
||||
&self,
|
||||
table_metadata_manager: &TableMetadataManagerRef,
|
||||
) -> Result<(
|
||||
DeserializedValueWithBytes<TableInfoValue>,
|
||||
PhysicalTableRouteValue,
|
||||
)> {
|
||||
let (table_info, table_route) = table_metadata_manager
|
||||
.get_full_table_info(self.physical_table_id)
|
||||
.await?;
|
||||
|
||||
let table_info = table_info.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: format!("table id - {}", self.physical_table_id),
|
||||
})?;
|
||||
|
||||
let physical_table_route = table_route
|
||||
.context(TableRouteNotFoundSnafu {
|
||||
table_id: self.physical_table_id,
|
||||
})?
|
||||
.into_inner();
|
||||
|
||||
let TableRouteValue::Physical(table_route) = physical_table_route else {
|
||||
return AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: format!(
|
||||
"expected a physical table but got a logical table: {:?}",
|
||||
self.physical_table_id
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
Ok((table_info, table_route))
|
||||
}
|
||||
|
||||
/// Validates that all logical table routes have the same physical table id.
|
||||
///
|
||||
/// This method performs the following validations:
|
||||
/// 1. Retrieves table routes for all the given table ids.
|
||||
/// 2. Ensures that all retrieved routes are logical table routes (not physical)
|
||||
/// 3. Verifies that all logical table routes reference the same physical table id.
|
||||
/// 4. Returns an error if any route is not logical or references a different physical table.
|
||||
async fn validate_logical_table_routes(
|
||||
&self,
|
||||
table_route_manager: &TableRouteManager,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<()> {
|
||||
let table_routes = table_route_manager
|
||||
.table_route_storage()
|
||||
.batch_get(table_ids)
|
||||
.await?;
|
||||
|
||||
let physical_table_id = self.physical_table_id;
|
||||
|
||||
let is_same_physical_table = table_routes.iter().all(|r| {
|
||||
if let Some(TableRouteValue::Logical(r)) = r {
|
||||
r.physical_table_id() == physical_table_id
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
ensure!(
|
||||
is_same_physical_table,
|
||||
AlterLogicalTablesInvalidArgumentsSnafu {
|
||||
err_msg: "All the tasks should have the same physical table id"
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates the alter logical expressions.
|
||||
///
|
||||
/// This method performs the following validations:
|
||||
/// 1. Validates that all alter table expressions have the same schema and catalog.
|
||||
/// 2. Validates that all alter table expressions are of the supported kind.
|
||||
/// 3. Validates that the physical table info and route exist.
|
||||
/// 4. Validates that all logical table routes have the same physical table id.
|
||||
///
|
||||
/// Returns a [ValidatorResult] containing the validation results.
|
||||
pub async fn validate(
|
||||
&self,
|
||||
table_metadata_manager: &TableMetadataManagerRef,
|
||||
) -> Result<ValidatorResult> {
|
||||
self.validate_schema()?;
|
||||
self.validate_alter_kind()?;
|
||||
let (physical_table_info, physical_table_route) =
|
||||
self.validate_physical_table(table_metadata_manager).await?;
|
||||
let table_names = self.table_names();
|
||||
let table_ids =
|
||||
get_all_table_ids_by_names(table_metadata_manager.table_name_manager(), &table_names)
|
||||
.await?;
|
||||
let mut table_info_values = get_all_table_info_values_by_table_ids(
|
||||
table_metadata_manager.table_info_manager(),
|
||||
&table_ids,
|
||||
&table_names,
|
||||
)
|
||||
.await?;
|
||||
self.validate_logical_table_routes(
|
||||
table_metadata_manager.table_route_manager(),
|
||||
&table_ids,
|
||||
)
|
||||
.await?;
|
||||
let skip_alter = self
|
||||
.alters
|
||||
.iter()
|
||||
.zip(table_info_values.iter())
|
||||
.map(|(task, table)| skip_alter_logical_region(task, table))
|
||||
.collect::<Vec<_>>();
|
||||
retain_unskipped(&mut table_info_values, &skip_alter);
|
||||
let num_skipped = skip_alter.iter().filter(|&&x| x).count();
|
||||
|
||||
Ok(ValidatorResult {
|
||||
num_skipped,
|
||||
skip_alter,
|
||||
table_info_values,
|
||||
physical_table_info,
|
||||
physical_table_route,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of the validator.
|
||||
pub(crate) struct ValidatorResult {
|
||||
pub(crate) num_skipped: usize,
|
||||
pub(crate) skip_alter: Vec<bool>,
|
||||
pub(crate) table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
|
||||
pub(crate) physical_table_info: DeserializedValueWithBytes<TableInfoValue>,
|
||||
pub(crate) physical_table_route: PhysicalTableRouteValue,
|
||||
}
|
||||
|
||||
/// Retains the elements that are not skipped.
|
||||
pub(crate) fn retain_unskipped<T>(target: &mut Vec<T>, skipped: &[bool]) {
|
||||
debug_assert_eq!(target.len(), skipped.len());
|
||||
let mut iter = skipped.iter();
|
||||
target.retain(|_| !iter.next().unwrap());
|
||||
}
|
||||
|
||||
/// Returns true if does not required to alter the logical region.
|
||||
fn skip_alter_logical_region(alter: &AlterTableExpr, table: &TableInfoValue) -> bool {
|
||||
let existing_columns = table
|
||||
.table_info
|
||||
.meta
|
||||
.schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.map(|c| &c.name)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let Some(kind) = alter.kind.as_ref() else {
|
||||
return true; // Never get here since we have checked it in `validate_alter_kind`
|
||||
};
|
||||
let Kind::AddColumns(add_columns) = kind else {
|
||||
return true; // Never get here since we have checked it in `validate_alter_kind`
|
||||
};
|
||||
|
||||
// We only check that all columns have been finished. That is to say,
|
||||
// if one part is finished but another part is not, it will be considered
|
||||
// unfinished.
|
||||
add_columns
|
||||
.add_columns
|
||||
.iter()
|
||||
.map(|add_column| add_column.column_def.as_ref().map(|c| &c.name))
|
||||
.all(|column| {
|
||||
column
|
||||
.map(|c| existing_columns.contains(c))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_retain_unskipped() {
|
||||
let mut target = vec![1, 2, 3, 4, 5];
|
||||
let skipped = vec![false, true, false, true, false];
|
||||
retain_unskipped(&mut target, &skipped);
|
||||
assert_eq!(target, vec![1, 3, 5]);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod raw_table_info;
|
||||
pub(crate) mod table_id;
|
||||
pub(crate) mod table_info;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
|
||||
46
src/common/meta/src/ddl/utils/table_id.rs
Normal file
46
src/common/meta/src/ddl/utils/table_id.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::TableId;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::error::{Result, TableNotFoundSnafu};
|
||||
use crate::key::table_name::{TableNameKey, TableNameManager};
|
||||
|
||||
/// Get all the table ids from the table names.
|
||||
///
|
||||
/// Returns an error if any table does not exist.
|
||||
pub(crate) async fn get_all_table_ids_by_names<'a>(
|
||||
table_name_manager: &TableNameManager,
|
||||
table_names: &[TableReference<'a>],
|
||||
) -> Result<Vec<TableId>> {
|
||||
let table_name_keys = table_names
|
||||
.iter()
|
||||
.map(TableNameKey::from)
|
||||
.collect::<Vec<_>>();
|
||||
let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
|
||||
let mut table_ids = Vec::with_capacity(table_name_values.len());
|
||||
for (value, table_name) in table_name_values.into_iter().zip(table_names) {
|
||||
let value = value
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_name.to_string(),
|
||||
})?
|
||||
.table_id();
|
||||
|
||||
table_ids.push(value);
|
||||
}
|
||||
|
||||
Ok(table_ids)
|
||||
}
|
||||
44
src/common/meta/src/ddl/utils/table_info.rs
Normal file
44
src/common/meta/src/ddl/utils/table_info.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::TableId;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::error::{Result, TableInfoNotFoundSnafu};
|
||||
use crate::key::table_info::{TableInfoManager, TableInfoValue};
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
|
||||
/// Get all table info values by table ids.
|
||||
///
|
||||
/// Returns an error if any table does not exist.
|
||||
pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
|
||||
table_info_manager: &TableInfoManager,
|
||||
table_ids: &[TableId],
|
||||
table_names: &[TableReference<'a>],
|
||||
) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
|
||||
let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
|
||||
let mut table_info_values = Vec::with_capacity(table_ids.len());
|
||||
for (table_id, table_name) in table_ids.iter().zip(table_names) {
|
||||
let table_info_value =
|
||||
table_info_map
|
||||
.remove(table_id)
|
||||
.with_context(|| TableInfoNotFoundSnafu {
|
||||
table: table_name.to_string(),
|
||||
})?;
|
||||
table_info_values.push(table_info_value);
|
||||
}
|
||||
|
||||
Ok(table_info_values)
|
||||
}
|
||||
@@ -103,6 +103,26 @@ pub fn table_decoder(kv: KeyValue) -> Result<(String, TableNameValue)> {
|
||||
Ok((table_name_key.table.to_string(), table_name_value))
|
||||
}
|
||||
|
||||
impl<'a> From<&TableReference<'a>> for TableNameKey<'a> {
|
||||
fn from(value: &TableReference<'a>) -> Self {
|
||||
Self {
|
||||
catalog: value.catalog,
|
||||
schema: value.schema,
|
||||
table: value.table,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<TableReference<'a>> for TableNameKey<'a> {
|
||||
fn from(value: TableReference<'a>) -> Self {
|
||||
Self {
|
||||
catalog: value.catalog,
|
||||
schema: value.schema,
|
||||
table: value.table,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a TableName> for TableNameKey<'a> {
|
||||
fn from(value: &'a TableName) -> Self {
|
||||
Self {
|
||||
|
||||
Reference in New Issue
Block a user