From c58d8aa94a4d82620462064adaf98ca59c2aab04 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 9 Jul 2025 13:13:19 +0800 Subject: [PATCH] refactor(meta): extract `AlterTableExecutor` from `AlterTableProcedure` (#6470) * refactor(meta): extract `AlterTableExecutor` from `AlterTableProcedure` Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/ddl.rs | 1 - .../alter_logical_tables/update_metadata.rs | 15 +- src/common/meta/src/ddl/alter_table.rs | 147 ++++----- src/common/meta/src/ddl/alter_table/check.rs | 62 ---- .../meta/src/ddl/alter_table/executor.rs | 308 ++++++++++++++++++ .../src/ddl/alter_table/region_request.rs | 46 +-- .../src/ddl/alter_table/update_metadata.rs | 103 ------ .../create_logical_tables/update_metadata.rs | 4 +- src/common/meta/src/ddl/create_table.rs | 2 +- .../meta/src/ddl/drop_table/executor.rs | 8 +- src/common/meta/src/ddl/utils.rs | 2 + .../raw_table_info.rs} | 0 12 files changed, 404 insertions(+), 294 deletions(-) delete mode 100644 src/common/meta/src/ddl/alter_table/check.rs create mode 100644 src/common/meta/src/ddl/alter_table/executor.rs delete mode 100644 src/common/meta/src/ddl/alter_table/update_metadata.rs rename src/common/meta/src/ddl/{physical_table_metadata.rs => utils/raw_table_info.rs} (100%) diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3c822bae1b..ad6a1de613 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -50,7 +50,6 @@ pub mod drop_flow; pub mod drop_table; pub mod drop_view; pub mod flow_meta; -mod physical_table_metadata; pub mod table_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index c05777bcc6..bc75ba1f7e 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -19,12 +19,13 @@ use snafu::ResultExt; use table::metadata::{RawTableInfo, TableInfo}; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::ddl::physical_table_metadata; +use crate::ddl::utils::raw_table_info; use crate::error; use crate::error::{ConvertAlterTableRequestSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::DeserializedValueWithBytes; use crate::rpc::ddl::AlterTableTask; +use crate::rpc::router::region_distribution; impl AlterLogicalTablesProcedure { pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { @@ -35,18 +36,24 @@ impl AlterLogicalTablesProcedure { // Safety: must exist. let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); + let physical_table_route = self.data.physical_table_route.as_ref().unwrap(); + let region_distribution = region_distribution(&physical_table_route.region_routes); // Generates new table info let old_raw_table_info = physical_table_info.table_info.clone(); - let new_raw_table_info = physical_table_metadata::build_new_physical_table_info( + let new_raw_table_info = raw_table_info::build_new_physical_table_info( old_raw_table_info, &self.data.physical_columns, ); - // Updates physical table's metadata, and we don't need to touch per-region settings. + // Updates physical table's metadata. self.context .table_metadata_manager - .update_table_info(physical_table_info, None, new_raw_table_info) + .update_table_info( + physical_table_info, + Some(region_distribution), + new_raw_table_info, + ) .await?; Ok(()) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 89081a4b58..1775bd2882 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod check; +mod executor; mod metadata; mod region_request; -mod update_metadata; use std::vec; @@ -29,33 +28,29 @@ use common_procedure::{ Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey, PoisonKeys, Procedure, ProcedureId, Status, StringKey, }; -use common_telemetry::{debug, error, info, warn}; -use futures::future::{self}; +use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; -use store_api::storage::RegionId; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_reference::TableReference; -use crate::cache_invalidator::Context; -use crate::ddl::physical_table_metadata::update_table_info_column_ids; +use crate::ddl::alter_table::executor::AlterTableExecutor; use crate::ddl::utils::{ - add_peer_context_if_needed, extract_column_metadatas, handle_multiple_results, - map_to_procedure_error, sync_follower_regions, MultipleResults, + extract_column_metadatas, handle_multiple_results, map_to_procedure_error, + sync_follower_regions, MultipleResults, }; use crate::ddl::DdlContext; use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu}; -use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::{DeserializedValueWithBytes, RegionDistribution}; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::metrics; use crate::poison_key::table_poison_key; use crate::rpc::ddl::AlterTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute}; +use crate::rpc::router::{find_leaders, region_distribution, RegionRoute}; /// The alter table procedure pub struct AlterTableProcedure { @@ -67,6 +62,24 @@ pub struct AlterTableProcedure { /// If we recover the procedure from json, then the table info value is not cached. /// But we already validated it in the prepare step. new_table_info: Option, + /// The alter table executor. + executor: AlterTableExecutor, +} + +/// Builds the executor from the [`AlterTableData`]. +/// +/// # Panics +/// - If the alter kind is not set. +fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor { + let table_name = alter_data.table_ref().into(); + let table_id = alter_data.table_id; + let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap(); + let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind { + Some(new_table_name.to_string()) + } else { + None + }; + AlterTableExecutor::new(table_name, table_id, new_table_name) } impl AlterTableProcedure { @@ -74,33 +87,42 @@ impl AlterTableProcedure { pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result { task.validate()?; + let data = AlterTableData::new(task, table_id); + let executor = build_executor_from_alter_expr(&data); Ok(Self { context, - data: AlterTableData::new(task, table_id), + data, new_table_info: None, + executor, }) } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let executor = build_executor_from_alter_expr(&data); + Ok(AlterTableProcedure { context, data, new_table_info: None, + executor, }) } // Checks whether the table exists. pub(crate) async fn on_prepare(&mut self) -> Result { - self.check_alter().await?; + self.executor + .on_prepare(&self.context.table_metadata_manager) + .await?; self.fill_table_info().await?; - // Validates the request and builds the new table info. - // We need to build the new table info here because we should ensure the alteration - // is valid in `UpdateMeta` state as we already altered the region. - // Safety: `fill_table_info()` already set it. + // Safety: filled in `fill_table_info`. let table_info_value = self.data.table_info_value.as_ref().unwrap(); - self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?); + let new_table_info = AlterTableExecutor::validate_alter_table_expr( + &table_info_value.table_info, + self.data.task.alter_table.clone(), + )?; + self.new_table_info = Some(new_table_info); // Safety: Checked in `AlterTableProcedure::new`. let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); @@ -143,9 +165,7 @@ impl AlterTableProcedure { self.data.region_distribution = Some(region_distribution(&physical_table_route.region_routes)); - let leaders = find_leaders(&physical_table_route.region_routes); - let mut alter_region_tasks = Vec::with_capacity(leaders.len()); let alter_kind = self.make_region_alter_kind()?; info!( @@ -158,31 +178,14 @@ impl AlterTableProcedure { ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id }); // Puts the poison before submitting alter region requests to datanodes. self.put_poison(ctx_provider, procedure_id).await?; - for datanode in leaders { - let requester = self.context.node_manager.datanode(&datanode).await; - let regions = find_leader_regions(&physical_table_route.region_routes, &datanode); - - for region in regions { - let region_id = RegionId::new(table_id, region); - let request = self.make_alter_region_request(region_id, alter_kind.clone())?; - debug!("Submitting {request:?} to {datanode}"); - - let datanode = datanode.clone(); - let requester = requester.clone(); - - alter_region_tasks.push(async move { - requester - .handle(request) - .await - .map_err(add_peer_context_if_needed(datanode)) - }); - } - } - - let results = future::join_all(alter_region_tasks) - .await - .into_iter() - .collect::>(); + let results = self + .executor + .on_alter_regions( + &self.context.node_manager, + &physical_table_route.region_routes, + alter_kind, + ) + .await; match handle_multiple_results(results) { MultipleResults::PartialRetryable(error) => { @@ -260,43 +263,34 @@ impl AlterTableProcedure { pub(crate) async fn on_update_metadata(&mut self) -> Result { let table_id = self.data.table_id(); let table_ref = self.data.table_ref(); - // Safety: checked before. + // Safety: filled in `fill_table_info`. let table_info_value = self.data.table_info_value.as_ref().unwrap(); + // Safety: Checked in `AlterTableProcedure::new`. + let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); + // Gets the table info from the cache or builds it. - let new_info = match &self.new_table_info { + let new_info = match &self.new_table_info { Some(cached) => cached.clone(), - None => self.build_new_table_info(&table_info_value.table_info) + None => AlterTableExecutor::validate_alter_table_expr( + &table_info_value.table_info, + self.data.task.alter_table.clone(), + ) .inspect_err(|e| { // We already check the table info in the prepare step so this should not happen. error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id); })?, }; - debug!( - "Starting update table: {} metadata, new table info {:?}", - table_ref.to_string(), - new_info - ); - - // Safety: Checked in `AlterTableProcedure::new`. - let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); - if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind { - self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value) - .await?; - } else { - let mut raw_table_info = new_info.into(); - if !self.data.column_metadatas.is_empty() { - update_table_info_column_ids(&mut raw_table_info, &self.data.column_metadatas); - } - // region distribution is set in submit_alter_region_requests - let region_distribution = self.data.region_distribution.as_ref().unwrap().clone(); - self.on_update_metadata_for_alter( - raw_table_info, - region_distribution, + // Safety: region distribution is set in `submit_alter_region_requests`. + self.executor + .on_alter_metadata( + &self.context.table_metadata_manager, table_info_value, + self.data.region_distribution.as_ref(), + new_info.into(), + &self.data.column_metadatas, ) .await?; - } info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}"); self.data.state = AlterTableState::InvalidateTableCache; @@ -305,18 +299,9 @@ impl AlterTableProcedure { /// Broadcasts the invalidating table cache instructions. async fn on_broadcast(&mut self) -> Result { - let cache_invalidator = &self.context.cache_invalidator; - - cache_invalidator - .invalidate( - &Context::default(), - &[ - CacheIdent::TableId(self.data.table_id()), - CacheIdent::TableName(self.data.table_ref().into()), - ], - ) + self.executor + .invalidate_table_cache(&self.context.cache_invalidator) .await?; - Ok(Status::done()) } diff --git a/src/common/meta/src/ddl/alter_table/check.rs b/src/common/meta/src/ddl/alter_table/check.rs deleted file mode 100644 index 5be40ac3e2..0000000000 --- a/src/common/meta/src/ddl/alter_table/check.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::alter_table_expr::Kind; -use api::v1::RenameTable; -use common_catalog::format_full_table_name; -use snafu::ensure; - -use crate::ddl::alter_table::AlterTableProcedure; -use crate::error::{self, Result}; -use crate::key::table_name::TableNameKey; - -impl AlterTableProcedure { - /// Checks: - /// - The new table name doesn't exist (rename). - /// - Table exists. - pub(crate) async fn check_alter(&self) -> Result<()> { - let alter_expr = &self.data.task.alter_table; - let catalog = &alter_expr.catalog_name; - let schema = &alter_expr.schema_name; - let table_name = &alter_expr.table_name; - // Safety: Checked in `AlterTableProcedure::new`. - let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); - - let manager = &self.context.table_metadata_manager; - if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind { - let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name); - let exists = manager - .table_name_manager() - .exists(new_table_name_key) - .await?; - ensure!( - !exists, - error::TableAlreadyExistsSnafu { - table_name: format_full_table_name(catalog, schema, new_table_name), - } - ) - } - - let table_name_key = TableNameKey::new(catalog, schema, table_name); - let exists = manager.table_name_manager().exists(table_name_key).await?; - ensure!( - exists, - error::TableNotFoundSnafu { - table_name: format_full_table_name(catalog, schema, &alter_expr.table_name), - } - ); - - Ok(()) - } -} diff --git a/src/common/meta/src/ddl/alter_table/executor.rs b/src/common/meta/src/ddl/alter_table/executor.rs new file mode 100644 index 0000000000..204851b2f3 --- /dev/null +++ b/src/common/meta/src/ddl/alter_table/executor.rs @@ -0,0 +1,308 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::region::RegionResponse; +use api::v1::region::region_request::Body; +use api::v1::region::{alter_request, AlterRequest, RegionRequest, RegionRequestHeader}; +use api::v1::AlterTableExpr; +use common_catalog::format_full_table_name; +use common_grpc_expr::alter_expr_to_request; +use common_telemetry::debug; +use common_telemetry::tracing_context::TracingContext; +use futures::future; +use snafu::{ensure, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::storage::{RegionId, TableId}; +use table::metadata::{RawTableInfo, TableInfo}; +use table::requests::AlterKind; +use table::table_name::TableName; + +use crate::cache_invalidator::{CacheInvalidatorRef, Context}; +use crate::ddl::utils::{add_peer_context_if_needed, raw_table_info}; +use crate::error::{self, Result, UnexpectedSnafu}; +use crate::instruction::CacheIdent; +use crate::key::table_info::TableInfoValue; +use crate::key::table_name::TableNameKey; +use crate::key::{DeserializedValueWithBytes, RegionDistribution, TableMetadataManagerRef}; +use crate::node_manager::NodeManagerRef; +use crate::rpc::router::{find_leaders, region_distribution, RegionRoute}; + +/// [AlterTableExecutor] performs: +/// - Alters the metadata of the table. +/// - Alters regions on the datanode nodes. +pub struct AlterTableExecutor { + table: TableName, + table_id: TableId, + /// The new table name if the alter kind is rename table. + new_table_name: Option, +} + +impl AlterTableExecutor { + /// Creates a new [`AlterTableExecutor`]. + pub fn new(table: TableName, table_id: TableId, new_table_name: Option) -> Self { + Self { + table, + table_id, + new_table_name, + } + } + + /// Prepares to alter the table. + /// + /// ## Checks: + /// - The new table name doesn't exist (rename). + /// - Table exists. + pub(crate) async fn on_prepare( + &self, + table_metadata_manager: &TableMetadataManagerRef, + ) -> Result<()> { + let catalog = &self.table.catalog_name; + let schema = &self.table.schema_name; + let table_name = &self.table.table_name; + + let manager = table_metadata_manager; + if let Some(new_table_name) = &self.new_table_name { + let new_table_name_key = TableNameKey::new(catalog, schema, new_table_name); + let exists = manager + .table_name_manager() + .exists(new_table_name_key) + .await?; + ensure!( + !exists, + error::TableAlreadyExistsSnafu { + table_name: format_full_table_name(catalog, schema, new_table_name), + } + ) + } + + let table_name_key = TableNameKey::new(catalog, schema, table_name); + let exists = manager.table_name_manager().exists(table_name_key).await?; + ensure!( + exists, + error::TableNotFoundSnafu { + table_name: format_full_table_name(catalog, schema, table_name), + } + ); + + Ok(()) + } + + /// Validates the alter table expression and builds the new table info. + /// + /// This validation is performed early to ensure the alteration is valid before + /// proceeding to the `on_alter_metadata` state, where regions have already been altered. + /// Building the new table info here allows us to catch any issues with the + /// alteration before committing metadata changes. + pub(crate) fn validate_alter_table_expr( + table_info: &RawTableInfo, + alter_table_expr: AlterTableExpr, + ) -> Result { + build_new_table_info(table_info, alter_table_expr) + } + + /// Updates table metadata for alter table operation. + pub(crate) async fn on_alter_metadata( + &self, + table_metadata_manager: &TableMetadataManagerRef, + current_table_info_value: &DeserializedValueWithBytes, + region_distribution: Option<&RegionDistribution>, + mut raw_table_info: RawTableInfo, + column_metadatas: &[ColumnMetadata], + ) -> Result<()> { + let table_ref = self.table.table_ref(); + let table_id = self.table_id; + + if let Some(new_table_name) = &self.new_table_name { + debug!( + "Starting update table: {} metadata, table_id: {}, new table info: {:?}, new table name: {}", + table_ref, table_id, raw_table_info, new_table_name + ); + + table_metadata_manager + .rename_table(current_table_info_value, new_table_name.to_string()) + .await?; + } else { + debug!( + "Starting update table: {} metadata, table_id: {}, new table info: {:?}", + table_ref, table_id, raw_table_info + ); + + ensure!( + region_distribution.is_some(), + UnexpectedSnafu { + err_msg: "region distribution is not set when updating table metadata", + } + ); + + if !column_metadatas.is_empty() { + raw_table_info::update_table_info_column_ids(&mut raw_table_info, column_metadatas); + } + table_metadata_manager + .update_table_info( + current_table_info_value, + region_distribution.cloned(), + raw_table_info, + ) + .await?; + } + + Ok(()) + } + + /// Alters regions on the datanode nodes. + pub(crate) async fn on_alter_regions( + &self, + node_manager: &NodeManagerRef, + region_routes: &[RegionRoute], + kind: Option, + ) -> Vec> { + let region_distribution = region_distribution(region_routes); + let leaders = find_leaders(region_routes) + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + let total_num_region = region_distribution + .values() + .map(|r| r.leader_regions.len()) + .sum::(); + let mut alter_region_tasks = Vec::with_capacity(total_num_region); + for (datanode_id, region_role_set) in region_distribution { + if region_role_set.leader_regions.is_empty() { + continue; + } + // Safety: must exists. + let peer = leaders.get(&datanode_id).unwrap(); + let requester = node_manager.datanode(peer).await; + + for region_id in region_role_set.leader_regions { + let region_id = RegionId::new(self.table_id, region_id); + let request = make_alter_region_request(region_id, kind.clone()); + + let requester = requester.clone(); + let peer = peer.clone(); + + alter_region_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer)) + }); + } + } + + future::join_all(alter_region_tasks) + .await + .into_iter() + .collect::>() + } + + /// Invalidates cache for the table. + pub(crate) async fn invalidate_table_cache( + &self, + cache_invalidator: &CacheInvalidatorRef, + ) -> Result<()> { + let ctx = Context { + subject: Some(format!( + "Invalidate table cache by altering table {}, table_id: {}", + self.table.table_ref(), + self.table_id, + )), + }; + + cache_invalidator + .invalidate( + &ctx, + &[ + CacheIdent::TableName(self.table.clone()), + CacheIdent::TableId(self.table_id), + ], + ) + .await?; + + Ok(()) + } +} + +/// Makes alter region request. +pub(crate) fn make_alter_region_request( + region_id: RegionId, + kind: Option, +) -> RegionRequest { + RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(Body::Alter(AlterRequest { + region_id: region_id.as_u64(), + kind, + ..Default::default() + })), + } +} + +/// Builds new table info after alteration. +/// +/// This function creates a new table info by applying the alter table expression +/// to the existing table info. For add column operations, it increments the +/// `next_column_id` by the number of columns being added, which may result in gaps +/// in the column id sequence. +fn build_new_table_info( + table_info: &RawTableInfo, + alter_table_expr: AlterTableExpr, +) -> Result { + let table_info = + TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?; + let schema_name = &table_info.schema_name; + let catalog_name = &table_info.catalog_name; + let table_name = &table_info.name; + let table_id = table_info.ident.table_id; + let request = alter_expr_to_request(table_id, alter_table_expr) + .context(error::ConvertAlterTableRequestSnafu)?; + + let new_meta = table_info + .meta + .builder_with_alter_kind(table_name, &request.alter_kind) + .context(error::TableSnafu)? + .build() + .with_context(|_| error::BuildTableMetaSnafu { + table_name: format_full_table_name(catalog_name, schema_name, table_name), + })?; + + let mut new_info = table_info.clone(); + new_info.meta = new_meta; + new_info.ident.version = table_info.ident.version + 1; + match request.alter_kind { + AlterKind::AddColumns { columns } => { + // Bumps the column id for the new columns. + // It may bump more than the actual number of columns added if there are + // existing columns, but it's fine. + new_info.meta.next_column_id += columns.len() as u32; + } + AlterKind::RenameTable { new_table_name } => { + new_info.name = new_table_name.to_string(); + } + AlterKind::DropColumns { .. } + | AlterKind::ModifyColumnTypes { .. } + | AlterKind::SetTableOptions { .. } + | AlterKind::UnsetTableOptions { .. } + | AlterKind::SetIndex { .. } + | AlterKind::UnsetIndex { .. } + | AlterKind::DropDefaults { .. } => {} + } + + Ok(new_info) +} diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index cf8e0d57aa..c4be75b2f6 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -15,43 +15,16 @@ use std::collections::HashSet; use api::v1::alter_table_expr::Kind; -use api::v1::region::region_request::Body; use api::v1::region::{ - alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef, - RegionRequest, RegionRequestHeader, + alter_request, AddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, }; -use common_telemetry::tracing_context::TracingContext; use snafu::OptionExt; -use store_api::storage::RegionId; use table::metadata::RawTableInfo; use crate::ddl::alter_table::AlterTableProcedure; use crate::error::{InvalidProtoMsgSnafu, Result}; impl AlterTableProcedure { - /// Makes alter region request from existing an alter kind. - /// Region alter request always add columns if not exist. - pub(crate) fn make_alter_region_request( - &self, - region_id: RegionId, - kind: Option, - ) -> Result { - // Safety: checked - let table_info = self.data.table_info().unwrap(); - - Ok(RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(Body::Alter(AlterRequest { - region_id: region_id.as_u64(), - schema_version: table_info.ident.version, - kind, - })), - }) - } - /// Makes alter kind proto that all regions can reuse. /// Region alter request always add columns if not exist. pub(crate) fn make_region_alter_kind(&self) -> Result> { @@ -155,6 +128,7 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use store_api::storage::{RegionId, TableId}; + use crate::ddl::alter_table::executor::make_alter_region_request; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::test_util::columns::TestColumnDefBuilder; use crate::ddl::test_util::create_table::{ @@ -261,15 +235,13 @@ mod tests { let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); let alter_kind = procedure.make_region_alter_kind().unwrap(); - let Some(Body::Alter(alter_region_request)) = procedure - .make_alter_region_request(region_id, alter_kind) - .unwrap() - .body + let Some(Body::Alter(alter_region_request)) = + make_alter_region_request(region_id, alter_kind).body else { unreachable!() }; assert_eq!(alter_region_request.region_id, region_id.as_u64()); - assert_eq!(alter_region_request.schema_version, 1); + assert_eq!(alter_region_request.schema_version, 0); assert_eq!( alter_region_request.kind, Some(region::alter_request::Kind::AddColumns( @@ -319,15 +291,13 @@ mod tests { let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); let alter_kind = procedure.make_region_alter_kind().unwrap(); - let Some(Body::Alter(alter_region_request)) = procedure - .make_alter_region_request(region_id, alter_kind) - .unwrap() - .body + let Some(Body::Alter(alter_region_request)) = + make_alter_region_request(region_id, alter_kind).body else { unreachable!() }; assert_eq!(alter_region_request.region_id, region_id.as_u64()); - assert_eq!(alter_region_request.schema_version, 1); + assert_eq!(alter_region_request.schema_version, 0); assert_eq!( alter_region_request.kind, Some(region::alter_request::Kind::ModifyColumnTypes( diff --git a/src/common/meta/src/ddl/alter_table/update_metadata.rs b/src/common/meta/src/ddl/alter_table/update_metadata.rs deleted file mode 100644 index 08ea63689e..0000000000 --- a/src/common/meta/src/ddl/alter_table/update_metadata.rs +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_grpc_expr::alter_expr_to_request; -use snafu::ResultExt; -use table::metadata::{RawTableInfo, TableInfo}; -use table::requests::AlterKind; - -use crate::ddl::alter_table::AlterTableProcedure; -use crate::error::{self, Result}; -use crate::key::table_info::TableInfoValue; -use crate::key::{DeserializedValueWithBytes, RegionDistribution}; - -impl AlterTableProcedure { - /// Builds new table info after alteration. - /// It bumps the column id of the table by the number of the add column requests. - /// So there may be holes in the column id sequence. - pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result { - let table_info = - TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?; - let table_ref = self.data.table_ref(); - let alter_expr = self.data.task.alter_table.clone(); - let request = alter_expr_to_request(self.data.table_id(), alter_expr) - .context(error::ConvertAlterTableRequestSnafu)?; - - let new_meta = table_info - .meta - .builder_with_alter_kind(table_ref.table, &request.alter_kind) - .context(error::TableSnafu)? - .build() - .with_context(|_| error::BuildTableMetaSnafu { - table_name: table_ref.table, - })?; - - let mut new_info = table_info.clone(); - new_info.meta = new_meta; - new_info.ident.version = table_info.ident.version + 1; - match request.alter_kind { - AlterKind::AddColumns { columns } => { - // Bumps the column id for the new columns. - // It may bump more than the actual number of columns added if there are - // existing columns, but it's fine. - new_info.meta.next_column_id += columns.len() as u32; - } - AlterKind::RenameTable { new_table_name } => { - new_info.name = new_table_name.to_string(); - } - AlterKind::DropColumns { .. } - | AlterKind::ModifyColumnTypes { .. } - | AlterKind::SetTableOptions { .. } - | AlterKind::UnsetTableOptions { .. } - | AlterKind::SetIndex { .. } - | AlterKind::UnsetIndex { .. } - | AlterKind::DropDefaults { .. } => {} - } - - Ok(new_info) - } - - /// Updates table metadata for rename table operation. - pub(crate) async fn on_update_metadata_for_rename( - &self, - new_table_name: String, - current_table_info_value: &DeserializedValueWithBytes, - ) -> Result<()> { - let table_metadata_manager = &self.context.table_metadata_manager; - table_metadata_manager - .rename_table(current_table_info_value, new_table_name) - .await?; - - Ok(()) - } - - /// Updates table metadata for alter table operation. - pub(crate) async fn on_update_metadata_for_alter( - &self, - new_table_info: RawTableInfo, - region_distribution: RegionDistribution, - current_table_info_value: &DeserializedValueWithBytes, - ) -> Result<()> { - let table_metadata_manager = &self.context.table_metadata_manager; - table_metadata_manager - .update_table_info( - current_table_info_value, - Some(region_distribution), - new_table_info, - ) - .await?; - - Ok(()) - } -} diff --git a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs index 75d235812f..c6e50b01d8 100644 --- a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs @@ -22,7 +22,7 @@ use table::table_name::TableName; use crate::cache_invalidator::Context; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::physical_table_metadata; +use crate::ddl::utils::raw_table_info; use crate::error::{Result, TableInfoNotFoundSnafu}; use crate::instruction::CacheIdent; @@ -47,7 +47,7 @@ impl CreateLogicalTablesProcedure { // Generates new table info let raw_table_info = physical_table_info.deref().table_info.clone(); - let new_table_info = physical_table_metadata::build_new_physical_table_info( + let new_table_info = raw_table_info::build_new_physical_table_info( raw_table_info, &self.data.physical_columns, ); diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 8781bbbf19..e1e2649de5 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -35,7 +35,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::physical_table_metadata::update_table_info_column_ids; +use crate::ddl::utils::raw_table_info::update_table_info_column_ids; use crate::ddl::utils::{ add_peer_context_if_needed, convert_region_routes_to_detecting_regions, extract_column_metadatas, map_to_procedure_error, region_storage_path, diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 7aae31b13a..7cc6589f6a 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -185,11 +185,15 @@ impl DropTableExecutor { .await } - /// Invalidates frontend caches + /// Invalidates caches for the table. pub async fn invalidate_table_cache(&self, ctx: &DdlContext) -> Result<()> { let cache_invalidator = &ctx.cache_invalidator; let ctx = Context { - subject: Some("Invalidate table cache by dropping table".to_string()), + subject: Some(format!( + "Invalidate table cache by dropping table {}, table_id: {}", + self.table.table_ref(), + self.table_id, + )), }; cache_invalidator diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 521d1dd817..7ddf5ebba4 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod raw_table_info; + use std::collections::HashMap; use std::fmt::Debug; diff --git a/src/common/meta/src/ddl/physical_table_metadata.rs b/src/common/meta/src/ddl/utils/raw_table_info.rs similarity index 100% rename from src/common/meta/src/ddl/physical_table_metadata.rs rename to src/common/meta/src/ddl/utils/raw_table_info.rs