operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
20use api::v1::{
21    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
22};
23#[cfg(feature = "enterprise")]
24use api::v1::{
25    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
26};
27use catalog::CatalogManagerRef;
28use chrono::Utc;
29use common_base::regex_pattern::NAME_PATTERN_REG;
30use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
31use common_catalog::{format_full_flow_name, format_full_table_name};
32use common_error::ext::BoxedError;
33use common_meta::cache_invalidator::Context;
34use common_meta::ddl::create_flow::FlowType;
35use common_meta::instruction::CacheIdent;
36use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
37use common_meta::procedure_executor::ExecutorContext;
38#[cfg(feature = "enterprise")]
39use common_meta::rpc::ddl::trigger::CreateTriggerTask;
40#[cfg(feature = "enterprise")]
41use common_meta::rpc::ddl::trigger::DropTriggerTask;
42use common_meta::rpc::ddl::{
43    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
44    SubmitDdlTaskResponse,
45};
46use common_query::Output;
47use common_sql::convert::sql_value_to_value;
48use common_telemetry::{debug, info, tracing, warn};
49use common_time::{Timestamp, Timezone};
50use datafusion_common::tree_node::TreeNodeVisitor;
51use datafusion_expr::LogicalPlan;
52use datatypes::prelude::ConcreteDataType;
53use datatypes::schema::{ColumnSchema, RawSchema, Schema};
54use datatypes::value::Value;
55use partition::expr::{Operand, PartitionExpr, RestrictedOp};
56use partition::multi_dim::MultiDimPartitionRule;
57use query::parser::QueryStatement;
58use query::plan::extract_and_rewrite_full_table_names;
59use query::query_engine::DefaultSerializer;
60use query::sql::create_table_stmt;
61use session::context::QueryContextRef;
62use session::table_name::table_idents_to_full_name;
63use snafu::{OptionExt, ResultExt, ensure};
64use sql::parser::{ParseOptions, ParserContext};
65#[cfg(feature = "enterprise")]
66use sql::statements::alter::trigger::AlterTrigger;
67use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
68#[cfg(feature = "enterprise")]
69use sql::statements::create::trigger::CreateTrigger;
70use sql::statements::create::{
71    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
72};
73use sql::statements::statement::Statement;
74use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
75use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
76use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
77use table::TableRef;
78use table::dist_table::DistTable;
79use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
80use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
81use table::table_name::TableName;
82
83use crate::error::{
84    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
85    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
86    EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
87    InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
88    InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
89    SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
90    TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
91    UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
92};
93use crate::expr_helper;
94use crate::statement::StatementExecutor;
95use crate::statement::show::create_partitions_stmt;
96
97impl StatementExecutor {
98    pub fn catalog_manager(&self) -> CatalogManagerRef {
99        self.catalog_manager.clone()
100    }
101
102    #[tracing::instrument(skip_all)]
103    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
104        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
105            .map_err(BoxedError::new)
106            .context(error::ExternalSnafu)?;
107
108        let schema_options = self
109            .table_metadata_manager
110            .schema_manager()
111            .get(SchemaNameKey {
112                catalog: &catalog,
113                schema: &schema,
114            })
115            .await
116            .context(TableMetadataManagerSnafu)?
117            .map(|v| v.into_inner());
118
119        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
120        // Don't inherit schema-level TTL/compaction options into table options:
121        // TTL is applied during compaction, and `compaction.*` is handled separately.
122        if let Some(schema_options) = schema_options {
123            for (key, value) in schema_options.extra_options.iter() {
124                if key.starts_with("compaction.") {
125                    continue;
126                }
127                create_expr
128                    .table_options
129                    .entry(key.clone())
130                    .or_insert(value.clone());
131            }
132        }
133
134        self.create_table_inner(create_expr, stmt.partitions, ctx)
135            .await
136    }
137
138    #[tracing::instrument(skip_all)]
139    pub async fn create_table_like(
140        &self,
141        stmt: CreateTableLike,
142        ctx: QueryContextRef,
143    ) -> Result<TableRef> {
144        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
145            .map_err(BoxedError::new)
146            .context(error::ExternalSnafu)?;
147        let table_ref = self
148            .catalog_manager
149            .table(&catalog, &schema, &table, Some(&ctx))
150            .await
151            .context(CatalogSnafu)?
152            .context(TableNotFoundSnafu { table_name: &table })?;
153        let partitions = self
154            .partition_manager
155            .find_table_partitions(table_ref.table_info().table_id())
156            .await
157            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
158
159        // CREATE TABLE LIKE also inherits database level options.
160        let schema_options = self
161            .table_metadata_manager
162            .schema_manager()
163            .get(SchemaNameKey {
164                catalog: &catalog,
165                schema: &schema,
166            })
167            .await
168            .context(TableMetadataManagerSnafu)?
169            .map(|v| v.into_inner());
170
171        let quote_style = ctx.quote_style();
172        let mut create_stmt =
173            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
174                .context(error::ParseQuerySnafu)?;
175        create_stmt.name = stmt.table_name;
176        create_stmt.if_not_exists = false;
177
178        let table_info = table_ref.table_info();
179        let partitions =
180            create_partitions_stmt(&table_info, partitions)?.and_then(|mut partitions| {
181                if !partitions.column_list.is_empty() {
182                    partitions.set_quote(quote_style);
183                    Some(partitions)
184                } else {
185                    None
186                }
187            });
188
189        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
190        self.create_table_inner(create_expr, partitions, ctx).await
191    }
192
193    #[tracing::instrument(skip_all)]
194    pub async fn create_external_table(
195        &self,
196        create_expr: CreateExternalTable,
197        ctx: QueryContextRef,
198    ) -> Result<TableRef> {
199        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
200        self.create_table_inner(create_expr, None, ctx).await
201    }
202
203    #[tracing::instrument(skip_all)]
204    pub async fn create_table_inner(
205        &self,
206        create_table: &mut CreateTableExpr,
207        partitions: Option<Partitions>,
208        query_ctx: QueryContextRef,
209    ) -> Result<TableRef> {
210        ensure!(
211            !is_readonly_schema(&create_table.schema_name),
212            SchemaReadOnlySnafu {
213                name: create_table.schema_name.clone()
214            }
215        );
216
217        if create_table.engine == METRIC_ENGINE_NAME
218            && create_table
219                .table_options
220                .contains_key(LOGICAL_TABLE_METADATA_KEY)
221        {
222            if let Some(partitions) = partitions.as_ref()
223                && !partitions.exprs.is_empty()
224            {
225                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
226                    .await?;
227            }
228            // Create logical tables
229            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
230                .await?
231                .into_iter()
232                .next()
233                .context(error::UnexpectedSnafu {
234                    violated: "expected to create logical tables",
235                })
236        } else {
237            // Create other normal table
238            self.create_non_logic_table(create_table, partitions, query_ctx)
239                .await
240        }
241    }
242
243    #[tracing::instrument(skip_all)]
244    pub async fn create_non_logic_table(
245        &self,
246        create_table: &mut CreateTableExpr,
247        partitions: Option<Partitions>,
248        query_ctx: QueryContextRef,
249    ) -> Result<TableRef> {
250        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
251
252        // Check if schema exists
253        let schema = self
254            .table_metadata_manager
255            .schema_manager()
256            .get(SchemaNameKey::new(
257                &create_table.catalog_name,
258                &create_table.schema_name,
259            ))
260            .await
261            .context(TableMetadataManagerSnafu)?;
262        ensure!(
263            schema.is_some(),
264            SchemaNotFoundSnafu {
265                schema_info: &create_table.schema_name,
266            }
267        );
268
269        // if table exists.
270        if let Some(table) = self
271            .catalog_manager
272            .table(
273                &create_table.catalog_name,
274                &create_table.schema_name,
275                &create_table.table_name,
276                Some(&query_ctx),
277            )
278            .await
279            .context(CatalogSnafu)?
280        {
281            return if create_table.create_if_not_exists {
282                Ok(table)
283            } else {
284                TableAlreadyExistsSnafu {
285                    table: format_full_table_name(
286                        &create_table.catalog_name,
287                        &create_table.schema_name,
288                        &create_table.table_name,
289                    ),
290                }
291                .fail()
292            };
293        }
294
295        ensure!(
296            NAME_PATTERN_REG.is_match(&create_table.table_name),
297            InvalidTableNameSnafu {
298                table_name: &create_table.table_name,
299            }
300        );
301
302        let table_name = TableName::new(
303            &create_table.catalog_name,
304            &create_table.schema_name,
305            &create_table.table_name,
306        );
307
308        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
309        let mut table_info = create_table_info(create_table, partition_cols)?;
310
311        let resp = self
312            .create_table_procedure(
313                create_table.clone(),
314                partitions,
315                table_info.clone(),
316                query_ctx,
317            )
318            .await?;
319
320        let table_id = resp
321            .table_ids
322            .into_iter()
323            .next()
324            .context(error::UnexpectedSnafu {
325                violated: "expected table_id",
326            })?;
327        info!("Successfully created table '{table_name}' with table id {table_id}");
328
329        table_info.ident.table_id = table_id;
330
331        let table_info: Arc<TableInfo> =
332            Arc::new(table_info.try_into().context(CreateTableInfoSnafu)?);
333        create_table.table_id = Some(api::v1::TableId { id: table_id });
334
335        let table = DistTable::table(table_info);
336
337        Ok(table)
338    }
339
340    #[tracing::instrument(skip_all)]
341    pub async fn create_logical_tables(
342        &self,
343        create_table_exprs: &[CreateTableExpr],
344        query_context: QueryContextRef,
345    ) -> Result<Vec<TableRef>> {
346        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
347        ensure!(
348            !create_table_exprs.is_empty(),
349            EmptyDdlExprSnafu {
350                name: "create logic tables"
351            }
352        );
353
354        // Check table names
355        for create_table in create_table_exprs {
356            ensure!(
357                NAME_PATTERN_REG.is_match(&create_table.table_name),
358                InvalidTableNameSnafu {
359                    table_name: &create_table.table_name,
360                }
361            );
362        }
363
364        let mut raw_tables_info = create_table_exprs
365            .iter()
366            .map(|create| create_table_info(create, vec![]))
367            .collect::<Result<Vec<_>>>()?;
368        let tables_data = create_table_exprs
369            .iter()
370            .cloned()
371            .zip(raw_tables_info.iter().cloned())
372            .collect::<Vec<_>>();
373
374        let resp = self
375            .create_logical_tables_procedure(tables_data, query_context)
376            .await?;
377
378        let table_ids = resp.table_ids;
379        ensure!(
380            table_ids.len() == raw_tables_info.len(),
381            CreateLogicalTablesSnafu {
382                reason: format!(
383                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
384                    raw_tables_info.len(),
385                    table_ids.len()
386                )
387            }
388        );
389        info!("Successfully created logical tables: {:?}", table_ids);
390
391        for (i, table_info) in raw_tables_info.iter_mut().enumerate() {
392            table_info.ident.table_id = table_ids[i];
393        }
394        let tables_info = raw_tables_info
395            .into_iter()
396            .map(|x| x.try_into().context(CreateTableInfoSnafu))
397            .collect::<Result<Vec<_>>>()?;
398
399        Ok(tables_info
400            .into_iter()
401            .map(|x| DistTable::table(Arc::new(x)))
402            .collect())
403    }
404
405    async fn validate_logical_table_partition_rule(
406        &self,
407        create_table: &CreateTableExpr,
408        partitions: &Partitions,
409        query_ctx: &QueryContextRef,
410    ) -> Result<()> {
411        let (_, mut logical_partition_exprs) =
412            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
413
414        let physical_table_name = create_table
415            .table_options
416            .get(LOGICAL_TABLE_METADATA_KEY)
417            .with_context(|| CreateLogicalTablesSnafu {
418                reason: format!(
419                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
420                ),
421            })?;
422
423        let physical_table = self
424            .catalog_manager
425            .table(
426                &create_table.catalog_name,
427                &create_table.schema_name,
428                physical_table_name,
429                Some(query_ctx),
430            )
431            .await
432            .context(CatalogSnafu)?
433            .context(TableNotFoundSnafu {
434                table_name: physical_table_name.clone(),
435            })?;
436
437        let physical_table_info = physical_table.table_info();
438        let partition_rule = self
439            .partition_manager
440            .find_table_partition_rule(&physical_table_info)
441            .await
442            .context(error::FindTablePartitionRuleSnafu {
443                table_name: physical_table_name.clone(),
444            })?;
445
446        let multi_dim_rule = partition_rule
447            .as_ref()
448            .as_any()
449            .downcast_ref::<MultiDimPartitionRule>()
450            .context(InvalidPartitionRuleSnafu {
451                reason: "physical table partition rule is not range-based",
452            })?;
453
454        // TODO(ruihang): project physical partition exprs to logical partition column
455        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
456        logical_partition_exprs.sort_unstable();
457        physical_partition_exprs.sort_unstable();
458
459        ensure!(
460            physical_partition_exprs == logical_partition_exprs,
461            InvalidPartitionRuleSnafu {
462                reason: format!(
463                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
464                    logical_partition_exprs, physical_partition_exprs
465                ),
466            }
467        );
468
469        Ok(())
470    }
471
472    #[cfg(feature = "enterprise")]
473    #[tracing::instrument(skip_all)]
474    pub async fn create_trigger(
475        &self,
476        stmt: CreateTrigger,
477        query_context: QueryContextRef,
478    ) -> Result<Output> {
479        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
480        self.create_trigger_inner(expr, query_context).await
481    }
482
483    #[cfg(feature = "enterprise")]
484    pub async fn create_trigger_inner(
485        &self,
486        expr: PbCreateTriggerExpr,
487        query_context: QueryContextRef,
488    ) -> Result<Output> {
489        self.create_trigger_procedure(expr, query_context).await?;
490        Ok(Output::new_with_affected_rows(0))
491    }
492
493    #[cfg(feature = "enterprise")]
494    async fn create_trigger_procedure(
495        &self,
496        expr: PbCreateTriggerExpr,
497        query_context: QueryContextRef,
498    ) -> Result<SubmitDdlTaskResponse> {
499        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
500            create_trigger: Some(expr),
501        })
502        .context(error::InvalidExprSnafu)?;
503
504        let request = SubmitDdlTaskRequest {
505            query_context,
506            task: DdlTask::new_create_trigger(task),
507        };
508
509        self.procedure_executor
510            .submit_ddl_task(&ExecutorContext::default(), request)
511            .await
512            .context(error::ExecuteDdlSnafu)
513    }
514
515    #[tracing::instrument(skip_all)]
516    pub async fn create_flow(
517        &self,
518        stmt: CreateFlow,
519        query_context: QueryContextRef,
520    ) -> Result<Output> {
521        // TODO(ruihang): do some verification
522        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
523
524        self.create_flow_inner(expr, query_context).await
525    }
526
527    pub async fn create_flow_inner(
528        &self,
529        expr: CreateFlowExpr,
530        query_context: QueryContextRef,
531    ) -> Result<Output> {
532        self.create_flow_procedure(expr, query_context).await?;
533        Ok(Output::new_with_affected_rows(0))
534    }
535
536    async fn create_flow_procedure(
537        &self,
538        expr: CreateFlowExpr,
539        query_context: QueryContextRef,
540    ) -> Result<SubmitDdlTaskResponse> {
541        let flow_type = self
542            .determine_flow_type(&expr, query_context.clone())
543            .await?;
544        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
545
546        let expr = {
547            let mut expr = expr;
548            expr.flow_options
549                .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
550            expr
551        };
552
553        let task = CreateFlowTask::try_from(PbCreateFlowTask {
554            create_flow: Some(expr),
555        })
556        .context(error::InvalidExprSnafu)?;
557        let request = SubmitDdlTaskRequest {
558            query_context,
559            task: DdlTask::new_create_flow(task),
560        };
561
562        self.procedure_executor
563            .submit_ddl_task(&ExecutorContext::default(), request)
564            .await
565            .context(error::ExecuteDdlSnafu)
566    }
567
568    /// Determine the flow type based on the SQL query
569    ///
570    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
571    async fn determine_flow_type(
572        &self,
573        expr: &CreateFlowExpr,
574        query_ctx: QueryContextRef,
575    ) -> Result<FlowType> {
576        // first check if source table's ttl is instant, if it is, force streaming mode
577        for src_table_name in &expr.source_table_names {
578            let table = self
579                .catalog_manager()
580                .table(
581                    &src_table_name.catalog_name,
582                    &src_table_name.schema_name,
583                    &src_table_name.table_name,
584                    Some(&query_ctx),
585                )
586                .await
587                .map_err(BoxedError::new)
588                .context(ExternalSnafu)?
589                .with_context(|| TableNotFoundSnafu {
590                    table_name: format_full_table_name(
591                        &src_table_name.catalog_name,
592                        &src_table_name.schema_name,
593                        &src_table_name.table_name,
594                    ),
595                })?;
596
597            // instant source table can only be handled by streaming mode
598            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
599                warn!(
600                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
601                    format_full_table_name(
602                        &src_table_name.catalog_name,
603                        &src_table_name.schema_name,
604                        &src_table_name.table_name
605                    ),
606                    expr.flow_name
607                );
608                return Ok(FlowType::Streaming);
609            }
610        }
611
612        let engine = &self.query_engine;
613        let stmts = ParserContext::create_with_dialect(
614            &expr.sql,
615            query_ctx.sql_dialect(),
616            ParseOptions::default(),
617        )
618        .map_err(BoxedError::new)
619        .context(ExternalSnafu)?;
620
621        ensure!(
622            stmts.len() == 1,
623            InvalidSqlSnafu {
624                err_msg: format!("Expect only one statement, found {}", stmts.len())
625            }
626        );
627        let stmt = &stmts[0];
628
629        // support tql parse too
630        let plan = match stmt {
631            // prom ql is only supported in batching mode
632            Statement::Tql(_) => return Ok(FlowType::Batching),
633            _ => engine
634                .planner()
635                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
636                .await
637                .map_err(BoxedError::new)
638                .context(ExternalSnafu)?,
639        };
640
641        /// Visitor to find aggregation or distinct
642        struct FindAggr {
643            is_aggr: bool,
644        }
645
646        impl TreeNodeVisitor<'_> for FindAggr {
647            type Node = LogicalPlan;
648            fn f_down(
649                &mut self,
650                node: &Self::Node,
651            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
652            {
653                match node {
654                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
655                        self.is_aggr = true;
656                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
657                    }
658                    _ => (),
659                }
660                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
661            }
662        }
663
664        let mut find_aggr = FindAggr { is_aggr: false };
665
666        plan.visit_with_subqueries(&mut find_aggr)
667            .context(BuildDfLogicalPlanSnafu)?;
668        if find_aggr.is_aggr {
669            Ok(FlowType::Batching)
670        } else {
671            Ok(FlowType::Streaming)
672        }
673    }
674
675    #[tracing::instrument(skip_all)]
676    pub async fn create_view(
677        &self,
678        create_view: CreateView,
679        ctx: QueryContextRef,
680    ) -> Result<TableRef> {
681        // convert input into logical plan
682        let logical_plan = match &*create_view.query {
683            Statement::Query(query) => {
684                self.plan(
685                    &QueryStatement::Sql(Statement::Query(query.clone())),
686                    ctx.clone(),
687                )
688                .await?
689            }
690            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
691            _ => {
692                return InvalidViewStmtSnafu {}.fail();
693            }
694        };
695        // Save the definition for `show create view`.
696        let definition = create_view.to_string();
697
698        // Save the columns in plan, it may changed when the schemas of tables in plan
699        // are altered.
700        let schema: Schema = logical_plan
701            .schema()
702            .clone()
703            .try_into()
704            .context(ConvertSchemaSnafu)?;
705        let plan_columns: Vec<_> = schema
706            .column_schemas()
707            .iter()
708            .map(|c| c.name.clone())
709            .collect();
710
711        let columns: Vec<_> = create_view
712            .columns
713            .iter()
714            .map(|ident| ident.to_string())
715            .collect();
716
717        // Validate columns
718        if !columns.is_empty() {
719            ensure!(
720                columns.len() == plan_columns.len(),
721                error::ViewColumnsMismatchSnafu {
722                    view_name: create_view.name.to_string(),
723                    expected: plan_columns.len(),
724                    actual: columns.len(),
725                }
726            );
727        }
728
729        // Extract the table names from the original plan
730        // and rewrite them as fully qualified names.
731        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
732            .context(ExtractTableNamesSnafu)?;
733
734        let table_names = table_names.into_iter().map(|t| t.into()).collect();
735
736        // TODO(dennis): we don't save the optimized plan yet,
737        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
738        // When the issues are fixed, we can use the `optimized_plan` instead.
739        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
740
741        // encode logical plan
742        let encoded_plan = DFLogicalSubstraitConvertor
743            .encode(&plan, DefaultSerializer)
744            .context(SubstraitCodecSnafu)?;
745
746        let expr = expr_helper::to_create_view_expr(
747            create_view,
748            encoded_plan.to_vec(),
749            table_names,
750            columns,
751            plan_columns,
752            definition,
753            ctx.clone(),
754        )?;
755
756        // TODO(dennis): validate the logical plan
757        self.create_view_by_expr(expr, ctx).await
758    }
759
760    pub async fn create_view_by_expr(
761        &self,
762        expr: CreateViewExpr,
763        ctx: QueryContextRef,
764    ) -> Result<TableRef> {
765        ensure! {
766            !(expr.create_if_not_exists & expr.or_replace),
767            InvalidSqlSnafu {
768                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
769            }
770        };
771        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
772
773        let schema_exists = self
774            .table_metadata_manager
775            .schema_manager()
776            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
777            .await
778            .context(TableMetadataManagerSnafu)?;
779
780        ensure!(
781            schema_exists,
782            SchemaNotFoundSnafu {
783                schema_info: &expr.schema_name,
784            }
785        );
786
787        // if view or table exists.
788        if let Some(table) = self
789            .catalog_manager
790            .table(
791                &expr.catalog_name,
792                &expr.schema_name,
793                &expr.view_name,
794                Some(&ctx),
795            )
796            .await
797            .context(CatalogSnafu)?
798        {
799            let table_type = table.table_info().table_type;
800
801            match (table_type, expr.create_if_not_exists, expr.or_replace) {
802                (TableType::View, true, false) => {
803                    return Ok(table);
804                }
805                (TableType::View, false, false) => {
806                    return ViewAlreadyExistsSnafu {
807                        name: format_full_table_name(
808                            &expr.catalog_name,
809                            &expr.schema_name,
810                            &expr.view_name,
811                        ),
812                    }
813                    .fail();
814                }
815                (TableType::View, _, true) => {
816                    // Try to replace an exists view
817                }
818                _ => {
819                    return TableAlreadyExistsSnafu {
820                        table: format_full_table_name(
821                            &expr.catalog_name,
822                            &expr.schema_name,
823                            &expr.view_name,
824                        ),
825                    }
826                    .fail();
827                }
828            }
829        }
830
831        ensure!(
832            NAME_PATTERN_REG.is_match(&expr.view_name),
833            InvalidViewNameSnafu {
834                name: expr.view_name.clone(),
835            }
836        );
837
838        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
839
840        let mut view_info = RawTableInfo {
841            ident: metadata::TableIdent {
842                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
843                table_id: 0,
844                version: 0,
845            },
846            name: expr.view_name.clone(),
847            desc: None,
848            catalog_name: expr.catalog_name.clone(),
849            schema_name: expr.schema_name.clone(),
850            // The meta doesn't make sense for views, so using a default one.
851            meta: RawTableMeta::default(),
852            table_type: TableType::View,
853        };
854
855        let request = SubmitDdlTaskRequest {
856            query_context: ctx,
857            task: DdlTask::new_create_view(expr, view_info.clone()),
858        };
859
860        let resp = self
861            .procedure_executor
862            .submit_ddl_task(&ExecutorContext::default(), request)
863            .await
864            .context(error::ExecuteDdlSnafu)?;
865
866        debug!(
867            "Submit creating view '{view_name}' task response: {:?}",
868            resp
869        );
870
871        let view_id = resp
872            .table_ids
873            .into_iter()
874            .next()
875            .context(error::UnexpectedSnafu {
876                violated: "expected table_id",
877            })?;
878        info!("Successfully created view '{view_name}' with view id {view_id}");
879
880        view_info.ident.table_id = view_id;
881
882        let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
883
884        let table = DistTable::table(view_info);
885
886        // Invalidates local cache ASAP.
887        self.cache_invalidator
888            .invalidate(
889                &Context::default(),
890                &[
891                    CacheIdent::TableId(view_id),
892                    CacheIdent::TableName(view_name.clone()),
893                ],
894            )
895            .await
896            .context(error::InvalidateTableCacheSnafu)?;
897
898        Ok(table)
899    }
900
901    #[tracing::instrument(skip_all)]
902    pub async fn drop_flow(
903        &self,
904        catalog_name: String,
905        flow_name: String,
906        drop_if_exists: bool,
907        query_context: QueryContextRef,
908    ) -> Result<Output> {
909        if let Some(flow) = self
910            .flow_metadata_manager
911            .flow_name_manager()
912            .get(&catalog_name, &flow_name)
913            .await
914            .context(error::TableMetadataManagerSnafu)?
915        {
916            let flow_id = flow.flow_id();
917            let task = DropFlowTask {
918                catalog_name,
919                flow_name,
920                flow_id,
921                drop_if_exists,
922            };
923            self.drop_flow_procedure(task, query_context).await?;
924
925            Ok(Output::new_with_affected_rows(0))
926        } else if drop_if_exists {
927            Ok(Output::new_with_affected_rows(0))
928        } else {
929            FlowNotFoundSnafu {
930                flow_name: format_full_flow_name(&catalog_name, &flow_name),
931            }
932            .fail()
933        }
934    }
935
936    async fn drop_flow_procedure(
937        &self,
938        expr: DropFlowTask,
939        query_context: QueryContextRef,
940    ) -> Result<SubmitDdlTaskResponse> {
941        let request = SubmitDdlTaskRequest {
942            query_context,
943            task: DdlTask::new_drop_flow(expr),
944        };
945
946        self.procedure_executor
947            .submit_ddl_task(&ExecutorContext::default(), request)
948            .await
949            .context(error::ExecuteDdlSnafu)
950    }
951
952    #[cfg(feature = "enterprise")]
953    #[tracing::instrument(skip_all)]
954    pub(super) async fn drop_trigger(
955        &self,
956        catalog_name: String,
957        trigger_name: String,
958        drop_if_exists: bool,
959        query_context: QueryContextRef,
960    ) -> Result<Output> {
961        let task = DropTriggerTask {
962            catalog_name,
963            trigger_name,
964            drop_if_exists,
965        };
966        self.drop_trigger_procedure(task, query_context).await?;
967        Ok(Output::new_with_affected_rows(0))
968    }
969
970    #[cfg(feature = "enterprise")]
971    async fn drop_trigger_procedure(
972        &self,
973        expr: DropTriggerTask,
974        query_context: QueryContextRef,
975    ) -> Result<SubmitDdlTaskResponse> {
976        let request = SubmitDdlTaskRequest {
977            query_context,
978            task: DdlTask::new_drop_trigger(expr),
979        };
980
981        self.procedure_executor
982            .submit_ddl_task(&ExecutorContext::default(), request)
983            .await
984            .context(error::ExecuteDdlSnafu)
985    }
986
987    /// Drop a view
988    #[tracing::instrument(skip_all)]
989    pub(crate) async fn drop_view(
990        &self,
991        catalog: String,
992        schema: String,
993        view: String,
994        drop_if_exists: bool,
995        query_context: QueryContextRef,
996    ) -> Result<Output> {
997        let view_info = if let Some(view) = self
998            .catalog_manager
999            .table(&catalog, &schema, &view, None)
1000            .await
1001            .context(CatalogSnafu)?
1002        {
1003            view.table_info()
1004        } else if drop_if_exists {
1005            // DROP VIEW IF EXISTS meets view not found - ignored
1006            return Ok(Output::new_with_affected_rows(0));
1007        } else {
1008            return TableNotFoundSnafu {
1009                table_name: format_full_table_name(&catalog, &schema, &view),
1010            }
1011            .fail();
1012        };
1013
1014        // Ensure the exists one is view, we can't drop other table types
1015        ensure!(
1016            view_info.table_type == TableType::View,
1017            error::InvalidViewSnafu {
1018                msg: "not a view",
1019                view_name: format_full_table_name(&catalog, &schema, &view),
1020            }
1021        );
1022
1023        let view_id = view_info.table_id();
1024
1025        let task = DropViewTask {
1026            catalog,
1027            schema,
1028            view,
1029            view_id,
1030            drop_if_exists,
1031        };
1032
1033        self.drop_view_procedure(task, query_context).await?;
1034
1035        Ok(Output::new_with_affected_rows(0))
1036    }
1037
1038    /// Submit [DropViewTask] to procedure executor.
1039    async fn drop_view_procedure(
1040        &self,
1041        expr: DropViewTask,
1042        query_context: QueryContextRef,
1043    ) -> Result<SubmitDdlTaskResponse> {
1044        let request = SubmitDdlTaskRequest {
1045            query_context,
1046            task: DdlTask::new_drop_view(expr),
1047        };
1048
1049        self.procedure_executor
1050            .submit_ddl_task(&ExecutorContext::default(), request)
1051            .await
1052            .context(error::ExecuteDdlSnafu)
1053    }
1054
1055    #[tracing::instrument(skip_all)]
1056    pub async fn alter_logical_tables(
1057        &self,
1058        alter_table_exprs: Vec<AlterTableExpr>,
1059        query_context: QueryContextRef,
1060    ) -> Result<Output> {
1061        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1062        ensure!(
1063            !alter_table_exprs.is_empty(),
1064            EmptyDdlExprSnafu {
1065                name: "alter logical tables"
1066            }
1067        );
1068
1069        // group by physical table id
1070        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1071        for expr in alter_table_exprs {
1072            // Get table_id from catalog_manager
1073            let catalog = if expr.catalog_name.is_empty() {
1074                query_context.current_catalog()
1075            } else {
1076                &expr.catalog_name
1077            };
1078            let schema = if expr.schema_name.is_empty() {
1079                query_context.current_schema()
1080            } else {
1081                expr.schema_name.clone()
1082            };
1083            let table_name = &expr.table_name;
1084            let table = self
1085                .catalog_manager
1086                .table(catalog, &schema, table_name, Some(&query_context))
1087                .await
1088                .context(CatalogSnafu)?
1089                .with_context(|| TableNotFoundSnafu {
1090                    table_name: format_full_table_name(catalog, &schema, table_name),
1091                })?;
1092            let table_id = table.table_info().ident.table_id;
1093            let physical_table_id = self
1094                .table_metadata_manager
1095                .table_route_manager()
1096                .get_physical_table_id(table_id)
1097                .await
1098                .context(TableMetadataManagerSnafu)?;
1099            groups.entry(physical_table_id).or_default().push(expr);
1100        }
1101
1102        // Submit procedure for each physical table
1103        let mut handles = Vec::with_capacity(groups.len());
1104        for (_physical_table_id, exprs) in groups {
1105            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1106            handles.push(fut);
1107        }
1108        let _results = futures::future::try_join_all(handles).await?;
1109
1110        Ok(Output::new_with_affected_rows(0))
1111    }
1112
1113    #[tracing::instrument(skip_all)]
1114    pub async fn drop_table(
1115        &self,
1116        table_name: TableName,
1117        drop_if_exists: bool,
1118        query_context: QueryContextRef,
1119    ) -> Result<Output> {
1120        // Reserved for grpc call
1121        self.drop_tables(&[table_name], drop_if_exists, query_context)
1122            .await
1123    }
1124
1125    #[tracing::instrument(skip_all)]
1126    pub async fn drop_tables(
1127        &self,
1128        table_names: &[TableName],
1129        drop_if_exists: bool,
1130        query_context: QueryContextRef,
1131    ) -> Result<Output> {
1132        let mut tables = Vec::with_capacity(table_names.len());
1133        for table_name in table_names {
1134            ensure!(
1135                !is_readonly_schema(&table_name.schema_name),
1136                SchemaReadOnlySnafu {
1137                    name: table_name.schema_name.clone()
1138                }
1139            );
1140
1141            if let Some(table) = self
1142                .catalog_manager
1143                .table(
1144                    &table_name.catalog_name,
1145                    &table_name.schema_name,
1146                    &table_name.table_name,
1147                    Some(&query_context),
1148                )
1149                .await
1150                .context(CatalogSnafu)?
1151            {
1152                tables.push(table.table_info().table_id());
1153            } else if drop_if_exists {
1154                // DROP TABLE IF EXISTS meets table not found - ignored
1155                continue;
1156            } else {
1157                return TableNotFoundSnafu {
1158                    table_name: table_name.to_string(),
1159                }
1160                .fail();
1161            }
1162        }
1163
1164        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1165            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1166                .await?;
1167
1168            // Invalidates local cache ASAP.
1169            self.cache_invalidator
1170                .invalidate(
1171                    &Context::default(),
1172                    &[
1173                        CacheIdent::TableId(table_id),
1174                        CacheIdent::TableName(table_name.clone()),
1175                    ],
1176                )
1177                .await
1178                .context(error::InvalidateTableCacheSnafu)?;
1179        }
1180        Ok(Output::new_with_affected_rows(0))
1181    }
1182
1183    #[tracing::instrument(skip_all)]
1184    pub async fn drop_database(
1185        &self,
1186        catalog: String,
1187        schema: String,
1188        drop_if_exists: bool,
1189        query_context: QueryContextRef,
1190    ) -> Result<Output> {
1191        ensure!(
1192            !is_readonly_schema(&schema),
1193            SchemaReadOnlySnafu { name: schema }
1194        );
1195
1196        if self
1197            .catalog_manager
1198            .schema_exists(&catalog, &schema, None)
1199            .await
1200            .context(CatalogSnafu)?
1201        {
1202            if schema == query_context.current_schema() {
1203                SchemaInUseSnafu { name: schema }.fail()
1204            } else {
1205                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1206                    .await?;
1207
1208                Ok(Output::new_with_affected_rows(0))
1209            }
1210        } else if drop_if_exists {
1211            // DROP TABLE IF EXISTS meets table not found - ignored
1212            Ok(Output::new_with_affected_rows(0))
1213        } else {
1214            SchemaNotFoundSnafu {
1215                schema_info: schema,
1216            }
1217            .fail()
1218        }
1219    }
1220
1221    #[tracing::instrument(skip_all)]
1222    pub async fn truncate_table(
1223        &self,
1224        table_name: TableName,
1225        time_ranges: Vec<(Timestamp, Timestamp)>,
1226        query_context: QueryContextRef,
1227    ) -> Result<Output> {
1228        ensure!(
1229            !is_readonly_schema(&table_name.schema_name),
1230            SchemaReadOnlySnafu {
1231                name: table_name.schema_name.clone()
1232            }
1233        );
1234
1235        let table = self
1236            .catalog_manager
1237            .table(
1238                &table_name.catalog_name,
1239                &table_name.schema_name,
1240                &table_name.table_name,
1241                Some(&query_context),
1242            )
1243            .await
1244            .context(CatalogSnafu)?
1245            .with_context(|| TableNotFoundSnafu {
1246                table_name: table_name.to_string(),
1247            })?;
1248        let table_id = table.table_info().table_id();
1249        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1250            .await?;
1251
1252        Ok(Output::new_with_affected_rows(0))
1253    }
1254
1255    #[tracing::instrument(skip_all)]
1256    pub async fn alter_table(
1257        &self,
1258        alter_table: AlterTable,
1259        query_context: QueryContextRef,
1260    ) -> Result<Output> {
1261        if matches!(
1262            alter_table.alter_operation(),
1263            AlterTableOperation::Repartition { .. }
1264        ) {
1265            let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1266            return NotSupportedSnafu {
1267                feat: "ALTER TABLE REPARTITION",
1268            }
1269            .fail();
1270        }
1271
1272        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1273        self.alter_table_inner(expr, query_context).await
1274    }
1275
1276    #[tracing::instrument(skip_all)]
1277    pub async fn alter_table_inner(
1278        &self,
1279        expr: AlterTableExpr,
1280        query_context: QueryContextRef,
1281    ) -> Result<Output> {
1282        ensure!(
1283            !is_readonly_schema(&expr.schema_name),
1284            SchemaReadOnlySnafu {
1285                name: expr.schema_name.clone()
1286            }
1287        );
1288
1289        let catalog_name = if expr.catalog_name.is_empty() {
1290            DEFAULT_CATALOG_NAME.to_string()
1291        } else {
1292            expr.catalog_name.clone()
1293        };
1294
1295        let schema_name = if expr.schema_name.is_empty() {
1296            DEFAULT_SCHEMA_NAME.to_string()
1297        } else {
1298            expr.schema_name.clone()
1299        };
1300
1301        let table_name = expr.table_name.clone();
1302
1303        let table = self
1304            .catalog_manager
1305            .table(
1306                &catalog_name,
1307                &schema_name,
1308                &table_name,
1309                Some(&query_context),
1310            )
1311            .await
1312            .context(CatalogSnafu)?
1313            .with_context(|| TableNotFoundSnafu {
1314                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1315            })?;
1316
1317        let table_id = table.table_info().ident.table_id;
1318        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1319        if !need_alter {
1320            return Ok(Output::new_with_affected_rows(0));
1321        }
1322        info!(
1323            "Table info before alter is {:?}, expr: {:?}",
1324            table.table_info(),
1325            expr
1326        );
1327
1328        let physical_table_id = self
1329            .table_metadata_manager
1330            .table_route_manager()
1331            .get_physical_table_id(table_id)
1332            .await
1333            .context(TableMetadataManagerSnafu)?;
1334
1335        let (req, invalidate_keys) = if physical_table_id == table_id {
1336            // This is physical table
1337            let req = SubmitDdlTaskRequest {
1338                query_context,
1339                task: DdlTask::new_alter_table(expr),
1340            };
1341
1342            let invalidate_keys = vec![
1343                CacheIdent::TableId(table_id),
1344                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1345            ];
1346
1347            (req, invalidate_keys)
1348        } else {
1349            // This is logical table
1350            let req = SubmitDdlTaskRequest {
1351                query_context,
1352                task: DdlTask::new_alter_logical_tables(vec![expr]),
1353            };
1354
1355            let mut invalidate_keys = vec![
1356                CacheIdent::TableId(physical_table_id),
1357                CacheIdent::TableId(table_id),
1358                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1359            ];
1360
1361            let physical_table = self
1362                .table_metadata_manager
1363                .table_info_manager()
1364                .get(physical_table_id)
1365                .await
1366                .context(TableMetadataManagerSnafu)?
1367                .map(|x| x.into_inner());
1368            if let Some(physical_table) = physical_table {
1369                let physical_table_name = TableName::new(
1370                    physical_table.table_info.catalog_name,
1371                    physical_table.table_info.schema_name,
1372                    physical_table.table_info.name,
1373                );
1374                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1375            }
1376
1377            (req, invalidate_keys)
1378        };
1379
1380        self.procedure_executor
1381            .submit_ddl_task(&ExecutorContext::default(), req)
1382            .await
1383            .context(error::ExecuteDdlSnafu)?;
1384
1385        // Invalidates local cache ASAP.
1386        self.cache_invalidator
1387            .invalidate(&Context::default(), &invalidate_keys)
1388            .await
1389            .context(error::InvalidateTableCacheSnafu)?;
1390
1391        Ok(Output::new_with_affected_rows(0))
1392    }
1393
1394    #[cfg(feature = "enterprise")]
1395    #[tracing::instrument(skip_all)]
1396    pub async fn alter_trigger(
1397        &self,
1398        _alter_expr: AlterTrigger,
1399        _query_context: QueryContextRef,
1400    ) -> Result<Output> {
1401        crate::error::NotSupportedSnafu {
1402            feat: "alter trigger",
1403        }
1404        .fail()
1405    }
1406
1407    #[tracing::instrument(skip_all)]
1408    pub async fn alter_database(
1409        &self,
1410        alter_expr: AlterDatabase,
1411        query_context: QueryContextRef,
1412    ) -> Result<Output> {
1413        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1414        self.alter_database_inner(alter_expr, query_context).await
1415    }
1416
1417    #[tracing::instrument(skip_all)]
1418    pub async fn alter_database_inner(
1419        &self,
1420        alter_expr: AlterDatabaseExpr,
1421        query_context: QueryContextRef,
1422    ) -> Result<Output> {
1423        ensure!(
1424            !is_readonly_schema(&alter_expr.schema_name),
1425            SchemaReadOnlySnafu {
1426                name: query_context.current_schema().clone()
1427            }
1428        );
1429
1430        let exists = self
1431            .catalog_manager
1432            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1433            .await
1434            .context(CatalogSnafu)?;
1435        ensure!(
1436            exists,
1437            SchemaNotFoundSnafu {
1438                schema_info: alter_expr.schema_name,
1439            }
1440        );
1441
1442        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1443            catalog_name: alter_expr.catalog_name.clone(),
1444            schema_name: alter_expr.schema_name.clone(),
1445        })];
1446
1447        self.alter_database_procedure(alter_expr, query_context)
1448            .await?;
1449
1450        // Invalidates local cache ASAP.
1451        self.cache_invalidator
1452            .invalidate(&Context::default(), &cache_ident)
1453            .await
1454            .context(error::InvalidateTableCacheSnafu)?;
1455
1456        Ok(Output::new_with_affected_rows(0))
1457    }
1458
1459    async fn create_table_procedure(
1460        &self,
1461        create_table: CreateTableExpr,
1462        partitions: Vec<PartitionExpr>,
1463        table_info: RawTableInfo,
1464        query_context: QueryContextRef,
1465    ) -> Result<SubmitDdlTaskResponse> {
1466        let partitions = partitions
1467            .into_iter()
1468            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1469            .collect::<Result<Vec<_>>>()?;
1470
1471        let request = SubmitDdlTaskRequest {
1472            query_context,
1473            task: DdlTask::new_create_table(create_table, partitions, table_info),
1474        };
1475
1476        self.procedure_executor
1477            .submit_ddl_task(&ExecutorContext::default(), request)
1478            .await
1479            .context(error::ExecuteDdlSnafu)
1480    }
1481
1482    async fn create_logical_tables_procedure(
1483        &self,
1484        tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
1485        query_context: QueryContextRef,
1486    ) -> Result<SubmitDdlTaskResponse> {
1487        let request = SubmitDdlTaskRequest {
1488            query_context,
1489            task: DdlTask::new_create_logical_tables(tables_data),
1490        };
1491
1492        self.procedure_executor
1493            .submit_ddl_task(&ExecutorContext::default(), request)
1494            .await
1495            .context(error::ExecuteDdlSnafu)
1496    }
1497
1498    async fn alter_logical_tables_procedure(
1499        &self,
1500        tables_data: Vec<AlterTableExpr>,
1501        query_context: QueryContextRef,
1502    ) -> Result<SubmitDdlTaskResponse> {
1503        let request = SubmitDdlTaskRequest {
1504            query_context,
1505            task: DdlTask::new_alter_logical_tables(tables_data),
1506        };
1507
1508        self.procedure_executor
1509            .submit_ddl_task(&ExecutorContext::default(), request)
1510            .await
1511            .context(error::ExecuteDdlSnafu)
1512    }
1513
1514    async fn drop_table_procedure(
1515        &self,
1516        table_name: &TableName,
1517        table_id: TableId,
1518        drop_if_exists: bool,
1519        query_context: QueryContextRef,
1520    ) -> Result<SubmitDdlTaskResponse> {
1521        let request = SubmitDdlTaskRequest {
1522            query_context,
1523            task: DdlTask::new_drop_table(
1524                table_name.catalog_name.clone(),
1525                table_name.schema_name.clone(),
1526                table_name.table_name.clone(),
1527                table_id,
1528                drop_if_exists,
1529            ),
1530        };
1531
1532        self.procedure_executor
1533            .submit_ddl_task(&ExecutorContext::default(), request)
1534            .await
1535            .context(error::ExecuteDdlSnafu)
1536    }
1537
1538    async fn drop_database_procedure(
1539        &self,
1540        catalog: String,
1541        schema: String,
1542        drop_if_exists: bool,
1543        query_context: QueryContextRef,
1544    ) -> Result<SubmitDdlTaskResponse> {
1545        let request = SubmitDdlTaskRequest {
1546            query_context,
1547            task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1548        };
1549
1550        self.procedure_executor
1551            .submit_ddl_task(&ExecutorContext::default(), request)
1552            .await
1553            .context(error::ExecuteDdlSnafu)
1554    }
1555
1556    async fn alter_database_procedure(
1557        &self,
1558        alter_expr: AlterDatabaseExpr,
1559        query_context: QueryContextRef,
1560    ) -> Result<SubmitDdlTaskResponse> {
1561        let request = SubmitDdlTaskRequest {
1562            query_context,
1563            task: DdlTask::new_alter_database(alter_expr),
1564        };
1565
1566        self.procedure_executor
1567            .submit_ddl_task(&ExecutorContext::default(), request)
1568            .await
1569            .context(error::ExecuteDdlSnafu)
1570    }
1571
1572    async fn truncate_table_procedure(
1573        &self,
1574        table_name: &TableName,
1575        table_id: TableId,
1576        time_ranges: Vec<(Timestamp, Timestamp)>,
1577        query_context: QueryContextRef,
1578    ) -> Result<SubmitDdlTaskResponse> {
1579        let request = SubmitDdlTaskRequest {
1580            query_context,
1581            task: DdlTask::new_truncate_table(
1582                table_name.catalog_name.clone(),
1583                table_name.schema_name.clone(),
1584                table_name.table_name.clone(),
1585                table_id,
1586                time_ranges,
1587            ),
1588        };
1589
1590        self.procedure_executor
1591            .submit_ddl_task(&ExecutorContext::default(), request)
1592            .await
1593            .context(error::ExecuteDdlSnafu)
1594    }
1595
1596    #[tracing::instrument(skip_all)]
1597    pub async fn create_database(
1598        &self,
1599        database: &str,
1600        create_if_not_exists: bool,
1601        options: HashMap<String, String>,
1602        query_context: QueryContextRef,
1603    ) -> Result<Output> {
1604        let catalog = query_context.current_catalog();
1605        ensure!(
1606            NAME_PATTERN_REG.is_match(catalog),
1607            error::UnexpectedSnafu {
1608                violated: format!("Invalid catalog name: {}", catalog)
1609            }
1610        );
1611
1612        ensure!(
1613            NAME_PATTERN_REG.is_match(database),
1614            error::UnexpectedSnafu {
1615                violated: format!("Invalid database name: {}", database)
1616            }
1617        );
1618
1619        if !self
1620            .catalog_manager
1621            .schema_exists(catalog, database, None)
1622            .await
1623            .context(CatalogSnafu)?
1624            && !self.catalog_manager.is_reserved_schema_name(database)
1625        {
1626            self.create_database_procedure(
1627                catalog.to_string(),
1628                database.to_string(),
1629                create_if_not_exists,
1630                options,
1631                query_context,
1632            )
1633            .await?;
1634
1635            Ok(Output::new_with_affected_rows(1))
1636        } else if create_if_not_exists {
1637            Ok(Output::new_with_affected_rows(1))
1638        } else {
1639            error::SchemaExistsSnafu { name: database }.fail()
1640        }
1641    }
1642
1643    async fn create_database_procedure(
1644        &self,
1645        catalog: String,
1646        database: String,
1647        create_if_not_exists: bool,
1648        options: HashMap<String, String>,
1649        query_context: QueryContextRef,
1650    ) -> Result<SubmitDdlTaskResponse> {
1651        let request = SubmitDdlTaskRequest {
1652            query_context,
1653            task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
1654        };
1655
1656        self.procedure_executor
1657            .submit_ddl_task(&ExecutorContext::default(), request)
1658            .await
1659            .context(error::ExecuteDdlSnafu)
1660    }
1661}
1662
1663/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
1664pub fn parse_partitions(
1665    create_table: &CreateTableExpr,
1666    partitions: Option<Partitions>,
1667    query_ctx: &QueryContextRef,
1668) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
1669    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
1670    // the partition column, and create only one partition.
1671    let partition_columns = find_partition_columns(&partitions)?;
1672    let partition_exprs =
1673        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
1674
1675    // Validates partition
1676    let exprs = partition_exprs.clone();
1677    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
1678        .context(InvalidPartitionSnafu)?;
1679
1680    Ok((partition_exprs, partition_columns))
1681}
1682
1683fn parse_partitions_for_logical_validation(
1684    create_table: &CreateTableExpr,
1685    partitions: &Partitions,
1686    query_ctx: &QueryContextRef,
1687) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
1688    let partition_columns = partitions
1689        .column_list
1690        .iter()
1691        .map(|ident| ident.value.clone())
1692        .collect::<Vec<_>>();
1693
1694    let column_name_and_type = partition_columns
1695        .iter()
1696        .map(|pc| {
1697            let column = create_table
1698                .column_defs
1699                .iter()
1700                .find(|c| &c.name == pc)
1701                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
1702            let column_name = &column.name;
1703            let data_type = ConcreteDataType::from(
1704                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1705                    .context(ColumnDataTypeSnafu)?,
1706            );
1707            Ok((column_name, data_type))
1708        })
1709        .collect::<Result<HashMap<_, _>>>()?;
1710
1711    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1712    for expr in &partitions.exprs {
1713        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
1714        partition_exprs.push(partition_expr);
1715    }
1716
1717    MultiDimPartitionRule::try_new(
1718        partition_columns.clone(),
1719        vec![],
1720        partition_exprs.clone(),
1721        true,
1722    )
1723    .context(InvalidPartitionSnafu)?;
1724
1725    Ok((partition_columns, partition_exprs))
1726}
1727
1728/// Verifies an alter and returns whether it is necessary to perform the alter.
1729///
1730/// # Returns
1731///
1732/// Returns true if the alter need to be porformed; otherwise, it returns false.
1733pub fn verify_alter(
1734    table_id: TableId,
1735    table_info: Arc<TableInfo>,
1736    expr: AlterTableExpr,
1737) -> Result<bool> {
1738    let request: AlterTableRequest =
1739        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
1740            .context(AlterExprToRequestSnafu)?;
1741
1742    let AlterTableRequest {
1743        table_name,
1744        alter_kind,
1745        ..
1746    } = &request;
1747
1748    if let AlterKind::RenameTable { new_table_name } = alter_kind {
1749        ensure!(
1750            NAME_PATTERN_REG.is_match(new_table_name),
1751            error::UnexpectedSnafu {
1752                violated: format!("Invalid table name: {}", new_table_name)
1753            }
1754        );
1755    } else if let AlterKind::AddColumns { columns } = alter_kind {
1756        // If all the columns are marked as add_if_not_exists and they already exist in the table,
1757        // there is no need to perform the alter.
1758        let column_names: HashSet<_> = table_info
1759            .meta
1760            .schema
1761            .column_schemas()
1762            .iter()
1763            .map(|schema| &schema.name)
1764            .collect();
1765        if columns.iter().all(|column| {
1766            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
1767        }) {
1768            return Ok(false);
1769        }
1770    }
1771
1772    let _ = table_info
1773        .meta
1774        .builder_with_alter_kind(table_name, &request.alter_kind)
1775        .context(error::TableSnafu)?
1776        .build()
1777        .context(error::BuildTableMetaSnafu { table_name })?;
1778
1779    Ok(true)
1780}
1781
1782pub fn create_table_info(
1783    create_table: &CreateTableExpr,
1784    partition_columns: Vec<String>,
1785) -> Result<RawTableInfo> {
1786    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
1787    let mut column_name_to_index_map = HashMap::new();
1788
1789    for (idx, column) in create_table.column_defs.iter().enumerate() {
1790        let schema =
1791            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
1792                column: &column.name,
1793            })?;
1794        let schema = schema.with_time_index(column.name == create_table.time_index);
1795
1796        column_schemas.push(schema);
1797        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
1798    }
1799
1800    let timestamp_index = column_name_to_index_map
1801        .get(&create_table.time_index)
1802        .cloned();
1803
1804    let raw_schema = RawSchema {
1805        column_schemas: column_schemas.clone(),
1806        timestamp_index,
1807        version: 0,
1808    };
1809
1810    let primary_key_indices = create_table
1811        .primary_keys
1812        .iter()
1813        .map(|name| {
1814            column_name_to_index_map
1815                .get(name)
1816                .cloned()
1817                .context(ColumnNotFoundSnafu { msg: name })
1818        })
1819        .collect::<Result<Vec<_>>>()?;
1820
1821    let partition_key_indices = partition_columns
1822        .into_iter()
1823        .map(|col_name| {
1824            column_name_to_index_map
1825                .get(&col_name)
1826                .cloned()
1827                .context(ColumnNotFoundSnafu { msg: col_name })
1828        })
1829        .collect::<Result<Vec<_>>>()?;
1830
1831    let table_options = TableOptions::try_from_iter(&create_table.table_options)
1832        .context(UnrecognizedTableOptionSnafu)?;
1833
1834    let meta = RawTableMeta {
1835        schema: raw_schema,
1836        primary_key_indices,
1837        value_indices: vec![],
1838        engine: create_table.engine.clone(),
1839        next_column_id: column_schemas.len() as u32,
1840        region_numbers: vec![],
1841        options: table_options,
1842        created_on: Utc::now(),
1843        updated_on: Utc::now(),
1844        partition_key_indices,
1845        column_ids: vec![],
1846    };
1847
1848    let desc = if create_table.desc.is_empty() {
1849        create_table.table_options.get(COMMENT_KEY).cloned()
1850    } else {
1851        Some(create_table.desc.clone())
1852    };
1853
1854    let table_info = RawTableInfo {
1855        ident: metadata::TableIdent {
1856            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
1857            table_id: 0,
1858            version: 0,
1859        },
1860        name: create_table.table_name.clone(),
1861        desc,
1862        catalog_name: create_table.catalog_name.clone(),
1863        schema_name: create_table.schema_name.clone(),
1864        meta,
1865        table_type: TableType::Base,
1866    };
1867    Ok(table_info)
1868}
1869
1870fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
1871    let columns = if let Some(partitions) = partitions {
1872        partitions
1873            .column_list
1874            .iter()
1875            .map(|x| x.value.clone())
1876            .collect::<Vec<_>>()
1877    } else {
1878        vec![]
1879    };
1880    Ok(columns)
1881}
1882
1883/// Parse [Partitions] into a group of partition entries.
1884///
1885/// Returns a list of [PartitionExpr], each of which defines a partition.
1886fn find_partition_entries(
1887    create_table: &CreateTableExpr,
1888    partitions: &Option<Partitions>,
1889    partition_columns: &[String],
1890    query_ctx: &QueryContextRef,
1891) -> Result<Vec<PartitionExpr>> {
1892    let Some(partitions) = partitions else {
1893        return Ok(vec![]);
1894    };
1895
1896    // extract concrete data type of partition columns
1897    let column_name_and_type = partition_columns
1898        .iter()
1899        .map(|pc| {
1900            let column = create_table
1901                .column_defs
1902                .iter()
1903                .find(|c| &c.name == pc)
1904                // unwrap is safe here because we have checked that partition columns are defined
1905                .unwrap();
1906            let column_name = &column.name;
1907            let data_type = ConcreteDataType::from(
1908                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
1909                    .context(ColumnDataTypeSnafu)?,
1910            );
1911            Ok((column_name, data_type))
1912        })
1913        .collect::<Result<HashMap<_, _>>>()?;
1914
1915    // Transform parser expr to partition expr
1916    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
1917    for partition in &partitions.exprs {
1918        let partition_expr =
1919            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
1920        partition_exprs.push(partition_expr);
1921    }
1922
1923    Ok(partition_exprs)
1924}
1925
1926fn convert_one_expr(
1927    expr: &Expr,
1928    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1929    timezone: &Timezone,
1930) -> Result<PartitionExpr> {
1931    let Expr::BinaryOp { left, op, right } = expr else {
1932        return InvalidPartitionRuleSnafu {
1933            reason: "partition rule must be a binary expression",
1934        }
1935        .fail();
1936    };
1937
1938    let op =
1939        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
1940            reason: format!("unsupported operator in partition expr {op}"),
1941        })?;
1942
1943    // convert leaf node.
1944    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
1945        // col, val
1946        (Expr::Identifier(ident), Expr::Value(value)) => {
1947            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1948            let value = convert_value(&value.value, data_type, timezone, None)?;
1949            (Operand::Column(column_name), op, Operand::Value(value))
1950        }
1951        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
1952            if let Expr::Value(v) = &**expr =>
1953        {
1954            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1955            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1956            (Operand::Column(column_name), op, Operand::Value(value))
1957        }
1958        // val, col
1959        (Expr::Value(value), Expr::Identifier(ident)) => {
1960            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1961            let value = convert_value(&value.value, data_type, timezone, None)?;
1962            (Operand::Value(value), op, Operand::Column(column_name))
1963        }
1964        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
1965            if let Expr::Value(v) = &**expr =>
1966        {
1967            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
1968            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
1969            (Operand::Value(value), op, Operand::Column(column_name))
1970        }
1971        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
1972            // sub-expr must against another sub-expr
1973            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
1974            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
1975            (Operand::Expr(lhs), op, Operand::Expr(rhs))
1976        }
1977        _ => {
1978            return InvalidPartitionRuleSnafu {
1979                reason: format!("invalid partition expr {expr}"),
1980            }
1981            .fail();
1982        }
1983    };
1984
1985    Ok(PartitionExpr::new(lhs, op, rhs))
1986}
1987
1988fn convert_identifier(
1989    ident: &Ident,
1990    column_name_and_type: &HashMap<&String, ConcreteDataType>,
1991) -> Result<(String, ConcreteDataType)> {
1992    let column_name = ident.value.clone();
1993    let data_type = column_name_and_type
1994        .get(&column_name)
1995        .cloned()
1996        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
1997    Ok((column_name, data_type))
1998}
1999
2000fn convert_value(
2001    value: &ParserValue,
2002    data_type: ConcreteDataType,
2003    timezone: &Timezone,
2004    unary_op: Option<UnaryOperator>,
2005) -> Result<Value> {
2006    sql_value_to_value(
2007        &ColumnSchema::new("<NONAME>", data_type, true),
2008        value,
2009        Some(timezone),
2010        unary_op,
2011        false,
2012    )
2013    .context(error::SqlCommonSnafu)
2014}
2015
2016#[cfg(test)]
2017mod test {
2018    use session::context::{QueryContext, QueryContextBuilder};
2019    use sql::dialect::GreptimeDbDialect;
2020    use sql::parser::{ParseOptions, ParserContext};
2021    use sql::statements::statement::Statement;
2022
2023    use super::*;
2024    use crate::expr_helper;
2025
2026    #[test]
2027    fn test_name_is_match() {
2028        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2029        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2030        assert!(NAME_PATTERN_REG.is_match("hello"));
2031        assert!(NAME_PATTERN_REG.is_match("test@"));
2032        assert!(!NAME_PATTERN_REG.is_match("@test"));
2033        assert!(NAME_PATTERN_REG.is_match("test#"));
2034        assert!(!NAME_PATTERN_REG.is_match("#test"));
2035        assert!(!NAME_PATTERN_REG.is_match("@"));
2036        assert!(!NAME_PATTERN_REG.is_match("#"));
2037    }
2038
2039    #[tokio::test]
2040    #[ignore = "TODO(ruihang): WIP new partition rule"]
2041    async fn test_parse_partitions() {
2042        common_telemetry::init_default_ut_logging();
2043        let cases = [
2044            (
2045                r"
2046CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2047PARTITION ON COLUMNS (b) (
2048  b < 'hz',
2049  b >= 'hz' AND b < 'sh',
2050  b >= 'sh'
2051)
2052ENGINE=mito",
2053                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2054            ),
2055            (
2056                r"
2057CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2058PARTITION BY RANGE COLUMNS (b, a) (
2059  PARTITION r0 VALUES LESS THAN ('hz', 10),
2060  b < 'hz' AND a < 10,
2061  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2062  b >= 'sh' AND a >= 20
2063)
2064ENGINE=mito",
2065                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2066            ),
2067        ];
2068        let ctx = QueryContextBuilder::default().build().into();
2069        for (sql, expected) in cases {
2070            let result = ParserContext::create_with_dialect(
2071                sql,
2072                &GreptimeDbDialect {},
2073                ParseOptions::default(),
2074            )
2075            .unwrap();
2076            match &result[0] {
2077                Statement::CreateTable(c) => {
2078                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2079                    let (partitions, _) =
2080                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2081                    let json = serde_json::to_string(&partitions).unwrap();
2082                    assert_eq!(json, expected);
2083                }
2084                _ => unreachable!(),
2085            }
2086        }
2087    }
2088}