diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 933b845886..0164662a8d 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -74,7 +74,7 @@ use crate::req_convert::insert::{ use crate::schema_helper::SchemaHelper; pub struct Inserter { - schema_helper: SchemaHelper, + pub(crate) schema_helper: SchemaHelper, pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) node_manager: NodeManagerRef, pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index dfcd93918d..a4e3d3d092 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -27,14 +27,12 @@ use common_meta::cache_invalidator::{CacheInvalidatorRef, Context}; use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::{TableMetadataManagerRef, NAME_PATTERN}; +use common_meta::key::TableMetadataManagerRef; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::Partition; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing; -use lazy_static::lazy_static; -use regex::Regex; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::create::Partitions; @@ -54,13 +52,7 @@ use crate::error::{ TableMetadataManagerSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; use crate::insert::build_create_table_expr; -use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter}; - -// TODO(yingwen): Replaces operator::statement::ddl::NAME_PATTERN_REG -lazy_static! { - /// Regex to validate table name. - static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); -} +use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG}; /// Helper to query and manipulate (CREATE/ALTER) table schemas. #[derive(Clone)] diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index b4ea68a718..df04801c65 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -26,7 +26,7 @@ use api::v1::{ }; use catalog::CatalogManagerRef; use chrono::Utc; -use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::is_readonly_schema; use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; @@ -43,7 +43,7 @@ use common_meta::rpc::ddl::{ CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, }; -use common_meta::rpc::router::{Partition, Partition as MetaPartition}; +use common_meta::rpc::router::Partition as MetaPartition; use common_query::Output; use common_telemetry::{debug, info, tracing, warn}; use common_time::Timezone; @@ -74,7 +74,6 @@ use sql::statements::create::{ use sql::statements::sql_value_to_value; use sql::statements::statement::Statement; use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue}; -use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; @@ -84,12 +83,11 @@ use table::TableRef; use crate::error::{ self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu, - ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, - DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, - FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, - InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, - SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, - TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, + ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, + InvalidPartitionSnafu, InvalidSqlSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, + ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, + SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, }; use crate::expr_helper; @@ -97,7 +95,8 @@ use crate::statement::show::create_partitions_stmt; use crate::statement::StatementExecutor; lazy_static! { - static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); + /// Regex to validate table name. + pub(crate) static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap(); } impl StatementExecutor { @@ -182,192 +181,10 @@ impl StatementExecutor { partitions: Option, query_ctx: QueryContextRef, ) -> Result { - ensure!( - !is_readonly_schema(&create_table.schema_name), - SchemaReadOnlySnafu { - name: create_table.schema_name.clone() - } - ); - - if create_table.engine == METRIC_ENGINE_NAME - && create_table - .table_options - .contains_key(LOGICAL_TABLE_METADATA_KEY) - { - // Create logical tables - ensure!( - partitions.is_none(), - InvalidPartitionRuleSnafu { - reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table", - } - ); - self.create_logical_tables(std::slice::from_ref(create_table), query_ctx) - .await? - .into_iter() - .next() - .context(error::UnexpectedSnafu { - violated: "expected to create logical tables", - }) - } else { - // Create other normal table - self.create_non_logic_table(create_table, partitions, query_ctx) - .await - } - } - - #[tracing::instrument(skip_all)] - pub async fn create_non_logic_table( - &self, - create_table: &mut CreateTableExpr, - partitions: Option, - query_ctx: QueryContextRef, - ) -> Result { - let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer(); - - // Check if schema exists - let schema = self - .table_metadata_manager - .schema_manager() - .get(SchemaNameKey::new( - &create_table.catalog_name, - &create_table.schema_name, - )) + self.inserter + .schema_helper + .create_table_by_expr(create_table, partitions, query_ctx) .await - .context(TableMetadataManagerSnafu)?; - ensure!( - schema.is_some(), - SchemaNotFoundSnafu { - schema_info: &create_table.schema_name, - } - ); - - // if table exists. - if let Some(table) = self - .catalog_manager - .table( - &create_table.catalog_name, - &create_table.schema_name, - &create_table.table_name, - Some(&query_ctx), - ) - .await - .context(CatalogSnafu)? - { - return if create_table.create_if_not_exists { - Ok(table) - } else { - TableAlreadyExistsSnafu { - table: format_full_table_name( - &create_table.catalog_name, - &create_table.schema_name, - &create_table.table_name, - ), - } - .fail() - }; - } - - ensure!( - NAME_PATTERN_REG.is_match(&create_table.table_name), - InvalidTableNameSnafu { - table_name: &create_table.table_name, - } - ); - - let table_name = TableName::new( - &create_table.catalog_name, - &create_table.schema_name, - &create_table.table_name, - ); - - let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?; - let mut table_info = create_table_info(create_table, partition_cols)?; - - let resp = self - .create_table_procedure( - create_table.clone(), - partitions, - table_info.clone(), - query_ctx, - ) - .await?; - - let table_id = resp - .table_ids - .into_iter() - .next() - .context(error::UnexpectedSnafu { - violated: "expected table_id", - })?; - info!("Successfully created table '{table_name}' with table id {table_id}"); - - table_info.ident.table_id = table_id; - - let table_info: Arc = - Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?); - create_table.table_id = Some(api::v1::TableId { id: table_id }); - - let table = DistTable::table(table_info); - - Ok(table) - } - - #[tracing::instrument(skip_all)] - pub async fn create_logical_tables( - &self, - create_table_exprs: &[CreateTableExpr], - query_context: QueryContextRef, - ) -> Result> { - let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer(); - ensure!( - !create_table_exprs.is_empty(), - EmptyDdlExprSnafu { - name: "create logic tables" - } - ); - - // Check table names - for create_table in create_table_exprs { - ensure!( - NAME_PATTERN_REG.is_match(&create_table.table_name), - InvalidTableNameSnafu { - table_name: &create_table.table_name, - } - ); - } - - let mut raw_tables_info = create_table_exprs - .iter() - .map(|create| create_table_info(create, vec![])) - .collect::>>()?; - let tables_data = create_table_exprs - .iter() - .cloned() - .zip(raw_tables_info.iter().cloned()) - .collect::>(); - - let resp = self - .create_logical_tables_procedure(tables_data, query_context) - .await?; - - let table_ids = resp.table_ids; - ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu { - reason: format!("The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}", raw_tables_info.len(), table_ids.len()) - }); - info!("Successfully created logical tables: {:?}", table_ids); - - for (i, table_info) in raw_tables_info.iter_mut().enumerate() { - table_info.ident.table_id = table_ids[i]; - } - let tables_info = raw_tables_info - .into_iter() - .map(|x| x.try_into().context(CreateTableInfoSnafu)) - .collect::>>()?; - - Ok(tables_info - .into_iter() - .map(|x| DistTable::table(Arc::new(x))) - .collect()) } #[cfg(feature = "enterprise")] @@ -953,64 +770,6 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } - #[tracing::instrument(skip_all)] - pub async fn alter_logical_tables( - &self, - alter_table_exprs: Vec, - query_context: QueryContextRef, - ) -> Result { - let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); - ensure!( - !alter_table_exprs.is_empty(), - EmptyDdlExprSnafu { - name: "alter logical tables" - } - ); - - // group by physical table id - let mut groups: HashMap> = HashMap::new(); - for expr in alter_table_exprs { - // Get table_id from catalog_manager - let catalog = if expr.catalog_name.is_empty() { - query_context.current_catalog() - } else { - &expr.catalog_name - }; - let schema = if expr.schema_name.is_empty() { - query_context.current_schema() - } else { - expr.schema_name.to_string() - }; - let table_name = &expr.table_name; - let table = self - .catalog_manager - .table(catalog, &schema, table_name, Some(&query_context)) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name(catalog, &schema, table_name), - })?; - let table_id = table.table_info().ident.table_id; - let physical_table_id = self - .table_metadata_manager - .table_route_manager() - .get_physical_table_id(table_id) - .await - .context(TableMetadataManagerSnafu)?; - groups.entry(physical_table_id).or_default().push(expr); - } - - // Submit procedure for each physical table - let mut handles = Vec::with_capacity(groups.len()); - for (_physical_table_id, exprs) in groups { - let fut = self.alter_logical_tables_procedure(exprs, query_context.clone()); - handles.push(fut); - } - let _results = futures::future::try_join_all(handles).await?; - - Ok(Output::new_with_affected_rows(0)) - } - #[tracing::instrument(skip_all)] pub async fn drop_table( &self, @@ -1168,116 +927,10 @@ impl StatementExecutor { expr: AlterTableExpr, query_context: QueryContextRef, ) -> Result { - ensure!( - !is_readonly_schema(&expr.schema_name), - SchemaReadOnlySnafu { - name: expr.schema_name.clone() - } - ); - - let catalog_name = if expr.catalog_name.is_empty() { - DEFAULT_CATALOG_NAME.to_string() - } else { - expr.catalog_name.clone() - }; - - let schema_name = if expr.schema_name.is_empty() { - DEFAULT_SCHEMA_NAME.to_string() - } else { - expr.schema_name.clone() - }; - - let table_name = expr.table_name.clone(); - - let table = self - .catalog_manager - .table( - &catalog_name, - &schema_name, - &table_name, - Some(&query_context), - ) + self.inserter + .schema_helper + .alter_table_by_expr(expr, query_context) .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name(&catalog_name, &schema_name, &table_name), - })?; - - let table_id = table.table_info().ident.table_id; - let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?; - if !need_alter { - return Ok(Output::new_with_affected_rows(0)); - } - info!( - "Table info before alter is {:?}, expr: {:?}", - table.table_info(), - expr - ); - - let physical_table_id = self - .table_metadata_manager - .table_route_manager() - .get_physical_table_id(table_id) - .await - .context(TableMetadataManagerSnafu)?; - - let (req, invalidate_keys) = if physical_table_id == table_id { - // This is physical table - let req = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_alter_table(expr), - }; - - let invalidate_keys = vec![ - CacheIdent::TableId(table_id), - CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), - ]; - - (req, invalidate_keys) - } else { - // This is logical table - let req = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_alter_logical_tables(vec![expr]), - }; - - let mut invalidate_keys = vec![ - CacheIdent::TableId(physical_table_id), - CacheIdent::TableId(table_id), - CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), - ]; - - let physical_table = self - .table_metadata_manager - .table_info_manager() - .get(physical_table_id) - .await - .context(TableMetadataManagerSnafu)? - .map(|x| x.into_inner()); - if let Some(physical_table) = physical_table { - let physical_table_name = TableName::new( - physical_table.table_info.catalog_name, - physical_table.table_info.schema_name, - physical_table.table_info.name, - ); - invalidate_keys.push(CacheIdent::TableName(physical_table_name)); - } - - (req, invalidate_keys) - }; - - self.procedure_executor - .submit_ddl_task(&ExecutorContext::default(), req) - .await - .context(error::ExecuteDdlSnafu)?; - - // Invalidates local cache ASAP. - self.cache_invalidator - .invalidate(&Context::default(), &invalidate_keys) - .await - .context(error::InvalidateTableCacheSnafu)?; - - Ok(Output::new_with_affected_rows(0)) } #[tracing::instrument(skip_all)] @@ -1332,58 +985,6 @@ impl StatementExecutor { Ok(Output::new_with_affected_rows(0)) } - async fn create_table_procedure( - &self, - create_table: CreateTableExpr, - partitions: Vec, - table_info: RawTableInfo, - query_context: QueryContextRef, - ) -> Result { - let partitions = partitions.into_iter().map(Into::into).collect(); - - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_create_table(create_table, partitions, table_info), - }; - - self.procedure_executor - .submit_ddl_task(&ExecutorContext::default(), request) - .await - .context(error::ExecuteDdlSnafu) - } - - async fn create_logical_tables_procedure( - &self, - tables_data: Vec<(CreateTableExpr, RawTableInfo)>, - query_context: QueryContextRef, - ) -> Result { - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_create_logical_tables(tables_data), - }; - - self.procedure_executor - .submit_ddl_task(&ExecutorContext::default(), request) - .await - .context(error::ExecuteDdlSnafu) - } - - async fn alter_logical_tables_procedure( - &self, - tables_data: Vec, - query_context: QueryContextRef, - ) -> Result { - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_alter_logical_tables(tables_data), - }; - - self.procedure_executor - .submit_ddl_task(&ExecutorContext::default(), request) - .await - .context(error::ExecuteDdlSnafu) - } - async fn drop_table_procedure( &self, table_name: &TableName,