From 14f3a4ab05370b18318e7fa4cd0daf4b54a945cc Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 19 Jun 2025 21:10:20 +0800 Subject: [PATCH] refactor: copy alter logic to SchemaHelper Signed-off-by: evenyag --- src/operator/src/schema_helper.rs | 225 +++++++++++++++++++++++++++++- src/operator/src/statement/ddl.rs | 109 +++++++-------- 2 files changed, 273 insertions(+), 61 deletions(-) diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index 6fc13c81f3..b4d146465f 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -14,18 +14,24 @@ //! Utilities to deal with table schemas. +use std::collections::HashMap; use std::sync::Arc; -use api::v1::{ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType}; +use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType}; use catalog::CatalogManagerRef; -use common_catalog::consts::{default_engine, is_readonly_schema}; +use common_catalog::consts::{ + default_engine, is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; use common_catalog::format_full_table_name; +use common_meta::cache_invalidator::{CacheInvalidatorRef, Context}; use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; +use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::{TableMetadataManagerRef, NAME_PATTERN}; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::Partition; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_query::Output; use common_telemetry::tracing; use lazy_static::lazy_static; use regex::Regex; @@ -36,18 +42,19 @@ use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, }; use table::dist_table::DistTable; -use table::metadata::{RawTableInfo, TableInfo}; +use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_name::TableName; use table::table_reference::TableReference; use table::TableRef; use crate::error::{ CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, EmptyDdlExprSnafu, - ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, Result, SchemaNotFoundSnafu, - SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, UnexpectedSnafu, + ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, InvalidateTableCacheSnafu, + Result, SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, + TableMetadataManagerSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; use crate::insert::build_create_table_expr; -use crate::statement::ddl::{create_table_info, parse_partitions}; +use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter}; // TODO(yingwen): Replaces operator::statement::ddl::NAME_PATTERN_REG lazy_static! { @@ -56,10 +63,12 @@ lazy_static! { } /// Helper to query and manipulate table schemas. +#[derive(Clone)] pub struct SchemaHelper { catalog_manager: CatalogManagerRef, table_metadata_manager: TableMetadataManagerRef, procedure_executor: ProcedureExecutorRef, + cache_invalidator: CacheInvalidatorRef, } impl SchemaHelper { @@ -68,11 +77,13 @@ impl SchemaHelper { catalog_manager: CatalogManagerRef, table_metadata_manager: TableMetadataManagerRef, procedure_executor: ProcedureExecutorRef, + cache_invalidator: CacheInvalidatorRef, ) -> Self { Self { catalog_manager, table_metadata_manager, procedure_executor, + cache_invalidator, } } @@ -360,6 +371,190 @@ impl SchemaHelper { .collect()) } + // TODO(yingwen): Replaces StatementExecutor::alter_table_inner + /// Alters a table by [AlterTableExpr]. + #[tracing::instrument(skip_all)] + pub async fn alter_table_by_expr( + &self, + expr: AlterTableExpr, + query_context: QueryContextRef, + ) -> Result { + ensure!( + !is_readonly_schema(&expr.schema_name), + SchemaReadOnlySnafu { + name: expr.schema_name.clone() + } + ); + + let catalog_name = if expr.catalog_name.is_empty() { + DEFAULT_CATALOG_NAME.to_string() + } else { + expr.catalog_name.clone() + }; + + let schema_name = if expr.schema_name.is_empty() { + DEFAULT_SCHEMA_NAME.to_string() + } else { + expr.schema_name.clone() + }; + + let table_name = expr.table_name.clone(); + + let table = self + .catalog_manager + .table( + &catalog_name, + &schema_name, + &table_name, + Some(&query_context), + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name(&catalog_name, &schema_name, &table_name), + })?; + + let table_id = table.table_info().ident.table_id; + let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?; + if !need_alter { + return Ok(Output::new_with_affected_rows(0)); + } + common_telemetry::info!( + "Table info before alter is {:?}, expr: {:?}", + table.table_info(), + expr + ); + + let physical_table_id = self + .table_metadata_manager + .table_route_manager() + .get_physical_table_id(table_id) + .await + .context(TableMetadataManagerSnafu)?; + + let (req, invalidate_keys) = if physical_table_id == table_id { + // This is physical table + let req = SubmitDdlTaskRequest { + query_context, + task: DdlTask::new_alter_table(expr), + }; + + let invalidate_keys = vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), + ]; + + (req, invalidate_keys) + } else { + // This is logical table + let req = SubmitDdlTaskRequest { + query_context, + task: DdlTask::new_alter_logical_tables(vec![expr]), + }; + + let mut invalidate_keys = vec![ + CacheIdent::TableId(physical_table_id), + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), + ]; + + let physical_table = self + .table_metadata_manager + .table_info_manager() + .get(physical_table_id) + .await + .context(TableMetadataManagerSnafu)? + .map(|x| x.into_inner()); + if let Some(physical_table) = physical_table { + let physical_table_name = TableName::new( + physical_table.table_info.catalog_name, + physical_table.table_info.schema_name, + physical_table.table_info.name, + ); + invalidate_keys.push(CacheIdent::TableName(physical_table_name)); + } + + (req, invalidate_keys) + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), req) + .await + .context(ExecuteDdlSnafu)?; + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate(&Context::default(), &invalidate_keys) + .await + .context(InvalidateTableCacheSnafu)?; + + Ok(Output::new_with_affected_rows(0)) + } + + // TODO(yingwen): Replaces StatementExecutor::alter_logical_tables + /// Alter logical tables. + pub async fn alter_logical_tables( + &self, + alter_table_exprs: Vec, + query_context: QueryContextRef, + ) -> Result { + let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); + ensure!( + !alter_table_exprs.is_empty(), + EmptyDdlExprSnafu { + name: "alter logical tables" + } + ); + + // group by physical table id + let mut groups: HashMap> = HashMap::new(); + for expr in alter_table_exprs { + // Get table_id from catalog_manager + let catalog = if expr.catalog_name.is_empty() { + query_context.current_catalog() + } else { + &expr.catalog_name + }; + let schema = if expr.schema_name.is_empty() { + query_context.current_schema() + } else { + expr.schema_name.to_string() + }; + let table_name = &expr.table_name; + let table = self + .catalog_manager + .table(catalog, &schema, table_name, Some(&query_context)) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name(catalog, &schema, table_name), + })?; + let table_id = table.table_info().ident.table_id; + let physical_table_id = self + .table_metadata_manager + .table_route_manager() + .get_physical_table_id(table_id) + .await + .context(TableMetadataManagerSnafu)?; + groups.entry(physical_table_id).or_default().push(expr); + } + + // Submit procedure for each physical table + let mut handles = Vec::with_capacity(groups.len()); + for (_physical_table_id, exprs) in groups { + let fut = self.alter_logical_tables_procedure(exprs, query_context.clone()); + handles.push(fut); + } + let _results = futures::future::try_join_all(handles).await?; + + Ok(Output::new_with_affected_rows(0)) + } + + /// Returns the catalog manager. + pub(crate) fn catalog_manager(&self) -> &CatalogManagerRef { + &self.catalog_manager + } + // TODO(yingwen): Replace StatementExecutor::create_table_procedure /// Submits a procedure to create a non-logical table. async fn create_table_procedure( @@ -399,4 +594,22 @@ impl SchemaHelper { .await .context(ExecuteDdlSnafu) } + + // TODO(yingwen): Replace StatementExecutor::alter_logical_tables_procedure + /// Submits a procedure to alter logical tables. + async fn alter_logical_tables_procedure( + &self, + tables_data: Vec, + query_context: QueryContextRef, + ) -> Result { + let request = SubmitDdlTaskRequest { + query_context, + task: DdlTask::new_alter_logical_tables(tables_data), + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(ExecuteDdlSnafu) + } } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 817cc4f4d2..b4ea68a718 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -1152,60 +1152,6 @@ impl StatementExecutor { Ok(Output::new_with_affected_rows(0)) } - /// Verifies an alter and returns whether it is necessary to perform the alter. - /// - /// # Returns - /// - /// Returns true if the alter need to be porformed; otherwise, it returns false. - fn verify_alter( - &self, - table_id: TableId, - table_info: Arc, - expr: AlterTableExpr, - ) -> Result { - let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr) - .context(AlterExprToRequestSnafu)?; - - let AlterTableRequest { - table_name, - alter_kind, - .. - } = &request; - - if let AlterKind::RenameTable { new_table_name } = alter_kind { - ensure!( - NAME_PATTERN_REG.is_match(new_table_name), - error::UnexpectedSnafu { - violated: format!("Invalid table name: {}", new_table_name) - } - ); - } else if let AlterKind::AddColumns { columns } = alter_kind { - // If all the columns are marked as add_if_not_exists and they already exist in the table, - // there is no need to perform the alter. - let column_names: HashSet<_> = table_info - .meta - .schema - .column_schemas() - .iter() - .map(|schema| &schema.name) - .collect(); - if columns.iter().all(|column| { - column_names.contains(&column.column_schema.name) && column.add_if_not_exists - }) { - return Ok(false); - } - } - - let _ = table_info - .meta - .builder_with_alter_kind(table_name, &request.alter_kind) - .context(error::TableSnafu)? - .build() - .context(error::BuildTableMetaSnafu { table_name })?; - - Ok(true) - } - #[tracing::instrument(skip_all)] pub async fn alter_table( &self, @@ -1258,7 +1204,7 @@ impl StatementExecutor { })?; let table_id = table.table_info().ident.table_id; - let need_alter = self.verify_alter(table_id, table.table_info(), expr.clone())?; + let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?; if !need_alter { return Ok(Output::new_with_affected_rows(0)); } @@ -1585,6 +1531,59 @@ impl StatementExecutor { } } +/// Verifies an alter and returns whether it is necessary to perform the alter. +/// +/// # Returns +/// +/// Returns true if the alter need to be porformed; otherwise, it returns false. +pub(crate) fn verify_alter( + table_id: TableId, + table_info: Arc, + expr: AlterTableExpr, +) -> Result { + let request: AlterTableRequest = + common_grpc_expr::alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?; + + let AlterTableRequest { + table_name, + alter_kind, + .. + } = &request; + + if let AlterKind::RenameTable { new_table_name } = alter_kind { + ensure!( + NAME_PATTERN_REG.is_match(new_table_name), + error::UnexpectedSnafu { + violated: format!("Invalid table name: {}", new_table_name) + } + ); + } else if let AlterKind::AddColumns { columns } = alter_kind { + // If all the columns are marked as add_if_not_exists and they already exist in the table, + // there is no need to perform the alter. + let column_names: HashSet<_> = table_info + .meta + .schema + .column_schemas() + .iter() + .map(|schema| &schema.name) + .collect(); + if columns.iter().all(|column| { + column_names.contains(&column.column_schema.name) && column.add_if_not_exists + }) { + return Ok(false); + } + } + + let _ = table_info + .meta + .builder_with_alter_kind(table_name, &request.alter_kind) + .context(error::TableSnafu)? + .build() + .context(error::BuildTableMetaSnafu { table_name })?; + + Ok(true) +} + /// Parse partition statement [Partitions] into [MetaPartition] and partition columns. pub(crate) fn parse_partitions( create_table: &CreateTableExpr,