Skip to main content

operator/statement/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::alter_table_expr::Kind;
21use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
22use api::v1::{
23    AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
24    Repartition, column_def,
25};
26#[cfg(feature = "enterprise")]
27use api::v1::{
28    CreateTriggerExpr as PbCreateTriggerExpr, meta::CreateTriggerTask as PbCreateTriggerTask,
29};
30use catalog::CatalogManagerRef;
31use chrono::Utc;
32use common_base::regex_pattern::NAME_PATTERN_REG;
33use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_readonly_schema};
34use common_catalog::{format_full_flow_name, format_full_table_name};
35use common_error::ext::BoxedError;
36use common_meta::cache_invalidator::Context;
37use common_meta::ddl::create_flow::FlowType;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
40use common_meta::procedure_executor::ExecutorContext;
41#[cfg(feature = "enterprise")]
42use common_meta::rpc::ddl::trigger::CreateTriggerTask;
43#[cfg(feature = "enterprise")]
44use common_meta::rpc::ddl::trigger::DropTriggerTask;
45use common_meta::rpc::ddl::{
46    CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
47    SubmitDdlTaskResponse,
48};
49use common_query::Output;
50use common_recordbatch::{RecordBatch, RecordBatches};
51use common_sql::convert::sql_value_to_value;
52use common_telemetry::{debug, info, tracing, warn};
53use common_time::{Timestamp, Timezone};
54use datafusion_common::tree_node::TreeNodeVisitor;
55use datafusion_expr::LogicalPlan;
56use datatypes::prelude::ConcreteDataType;
57use datatypes::schema::{ColumnSchema, Schema};
58use datatypes::value::Value;
59use datatypes::vectors::{StringVector, VectorRef};
60use humantime::parse_duration;
61use partition::expr::{Operand, PartitionExpr, RestrictedOp};
62use partition::multi_dim::MultiDimPartitionRule;
63use query::parser::QueryStatement;
64use query::plan::extract_and_rewrite_full_table_names;
65use query::query_engine::DefaultSerializer;
66use query::sql::create_table_stmt;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::{OptionExt, ResultExt, ensure};
70use sql::parser::{ParseOptions, ParserContext};
71use sql::parsers::utils::is_tql;
72use sql::statements::OptionMap;
73#[cfg(feature = "enterprise")]
74use sql::statements::alter::trigger::AlterTrigger;
75use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
76#[cfg(feature = "enterprise")]
77use sql::statements::create::trigger::CreateTrigger;
78use sql::statements::create::{
79    CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
80};
81use sql::statements::statement::Statement;
82use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
83use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
84use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
85use table::TableRef;
86use table::dist_table::DistTable;
87use table::metadata::{self, TableId, TableInfo, TableMeta, TableType};
88use table::requests::{
89    AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
90};
91use table::table_name::TableName;
92use table::table_reference::TableReference;
93
94use crate::error::{
95    self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
96    ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu,
97    DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
98    FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
99    InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
100    PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
101    SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
102    TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
103    ViewAlreadyExistsSnafu,
104};
105use crate::expr_helper::{self, RepartitionRequest};
106use crate::statement::StatementExecutor;
107use crate::statement::show::create_partitions_stmt;
108use crate::utils::to_meta_query_context;
109
110#[derive(Debug, Clone, Copy)]
111struct DdlSubmitOptions {
112    wait: bool,
113    timeout: Duration,
114}
115
116const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
117const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY];
118
119fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
120    let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
121    let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
122    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
123        "Procedure ID",
124        vector.data_type(),
125        false,
126    )]));
127    let batch =
128        RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
129    let batches =
130        RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
131    Ok(Output::new_with_record_batches(batches))
132}
133
134fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
135    let wait = match options.get(DDL_WAIT) {
136        Some(value) => value.parse::<bool>().map_err(|_| {
137            InvalidSqlSnafu {
138                err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
139            }
140            .build()
141        })?,
142        None => SubmitDdlTaskRequest::default_wait(),
143    };
144
145    let timeout = match options.get(DDL_TIMEOUT) {
146        Some(value) => parse_duration(value).map_err(|err| {
147            InvalidSqlSnafu {
148                err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
149            }
150            .build()
151        })?,
152        None => SubmitDdlTaskRequest::default_timeout(),
153    };
154
155    Ok(DdlSubmitOptions { wait, timeout })
156}
157
158fn supported_flow_options() -> String {
159    ALLOWED_FLOW_OPTIONS.join(", ")
160}
161
162fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
163    value
164        .trim()
165        .to_ascii_lowercase()
166        .parse::<bool>()
167        .map(|value| value.to_string())
168        .map_err(|_| {
169            InvalidSqlSnafu {
170                err_msg: format!("invalid flow option '{key}': '{value}'"),
171            }
172            .build()
173        })
174}
175
176fn validate_and_normalize_flow_options(
177    options: HashMap<String, String>,
178) -> Result<HashMap<String, String>> {
179    options
180        .into_iter()
181        .map(|(key, value)| {
182            if key == FlowType::FLOW_TYPE_KEY {
183                return InvalidSqlSnafu {
184                    err_msg: format!("flow option '{key}' is reserved for internal use"),
185                }
186                .fail();
187            }
188
189            let normalized_value = match key.as_str() {
190                DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
191                _ => {
192                    return InvalidSqlSnafu {
193                        err_msg: format!(
194                            "unknown flow option '{key}', supported options: {}",
195                            supported_flow_options()
196                        ),
197                    }
198                    .fail();
199                }
200            };
201
202            Ok((key, normalized_value))
203        })
204        .collect()
205}
206
207impl StatementExecutor {
208    pub fn catalog_manager(&self) -> CatalogManagerRef {
209        self.catalog_manager.clone()
210    }
211
212    #[tracing::instrument(skip_all)]
213    pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
214        let (catalog, schema, _table) = table_idents_to_full_name(&stmt.name, &ctx)
215            .map_err(BoxedError::new)
216            .context(error::ExternalSnafu)?;
217
218        let schema_options = self
219            .table_metadata_manager
220            .schema_manager()
221            .get(SchemaNameKey {
222                catalog: &catalog,
223                schema: &schema,
224            })
225            .await
226            .context(TableMetadataManagerSnafu)?
227            .map(|v| v.into_inner());
228
229        let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
230        // Don't inherit schema-level TTL/compaction options into table options:
231        // TTL is applied during compaction, and `compaction.*` is handled separately.
232        if let Some(schema_options) = schema_options {
233            for (key, value) in schema_options.extra_options.iter() {
234                if key.starts_with("compaction.") {
235                    continue;
236                }
237                create_expr
238                    .table_options
239                    .entry(key.clone())
240                    .or_insert(value.clone());
241            }
242        }
243
244        self.create_table_inner(create_expr, stmt.partitions, ctx)
245            .await
246    }
247
248    #[tracing::instrument(skip_all)]
249    pub async fn create_table_like(
250        &self,
251        stmt: CreateTableLike,
252        ctx: QueryContextRef,
253    ) -> Result<TableRef> {
254        let (catalog, schema, table) = table_idents_to_full_name(&stmt.source_name, &ctx)
255            .map_err(BoxedError::new)
256            .context(error::ExternalSnafu)?;
257        let table_ref = self
258            .catalog_manager
259            .table(&catalog, &schema, &table, Some(&ctx))
260            .await
261            .context(CatalogSnafu)?
262            .context(TableNotFoundSnafu { table_name: &table })?;
263        let partition_info = self
264            .partition_manager
265            .find_physical_partition_info(table_ref.table_info().table_id())
266            .await
267            .context(error::FindTablePartitionRuleSnafu { table_name: table })?;
268
269        // CREATE TABLE LIKE also inherits database level options.
270        let schema_options = self
271            .table_metadata_manager
272            .schema_manager()
273            .get(SchemaNameKey {
274                catalog: &catalog,
275                schema: &schema,
276            })
277            .await
278            .context(TableMetadataManagerSnafu)?
279            .map(|v| v.into_inner());
280
281        let quote_style = ctx.quote_style();
282        let mut create_stmt =
283            create_table_stmt(&table_ref.table_info(), schema_options, quote_style)
284                .context(error::ParseQuerySnafu)?;
285        create_stmt.name = stmt.table_name;
286        create_stmt.if_not_exists = false;
287
288        let table_info = table_ref.table_info();
289        let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?.and_then(
290            |mut partitions| {
291                if !partitions.column_list.is_empty() {
292                    partitions.set_quote(quote_style);
293                    Some(partitions)
294                } else {
295                    None
296                }
297            },
298        );
299
300        let create_expr = &mut expr_helper::create_to_expr(&create_stmt, &ctx)?;
301        self.create_table_inner(create_expr, partitions, ctx).await
302    }
303
304    #[tracing::instrument(skip_all)]
305    pub async fn create_external_table(
306        &self,
307        create_expr: CreateExternalTable,
308        ctx: QueryContextRef,
309    ) -> Result<TableRef> {
310        let create_expr = &mut expr_helper::create_external_expr(create_expr, &ctx).await?;
311        self.create_table_inner(create_expr, None, ctx).await
312    }
313
314    #[tracing::instrument(skip_all)]
315    pub async fn create_table_inner(
316        &self,
317        create_table: &mut CreateTableExpr,
318        partitions: Option<Partitions>,
319        query_ctx: QueryContextRef,
320    ) -> Result<TableRef> {
321        ensure!(
322            !is_readonly_schema(&create_table.schema_name),
323            SchemaReadOnlySnafu {
324                name: create_table.schema_name.clone()
325            }
326        );
327
328        if create_table.engine == METRIC_ENGINE_NAME
329            && create_table
330                .table_options
331                .contains_key(LOGICAL_TABLE_METADATA_KEY)
332        {
333            if let Some(partitions) = partitions.as_ref()
334                && !partitions.exprs.is_empty()
335            {
336                self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
337                    .await?;
338            }
339            // Create logical tables
340            self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
341                .await?
342                .into_iter()
343                .next()
344                .context(error::UnexpectedSnafu {
345                    violated: "expected to create logical tables",
346                })
347        } else {
348            // Create other normal table
349            self.create_non_logic_table(create_table, partitions, query_ctx)
350                .await
351        }
352    }
353
354    #[tracing::instrument(skip_all)]
355    pub async fn create_non_logic_table(
356        &self,
357        create_table: &mut CreateTableExpr,
358        partitions: Option<Partitions>,
359        query_ctx: QueryContextRef,
360    ) -> Result<TableRef> {
361        let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
362
363        // Check if schema exists
364        let schema = self
365            .table_metadata_manager
366            .schema_manager()
367            .get(SchemaNameKey::new(
368                &create_table.catalog_name,
369                &create_table.schema_name,
370            ))
371            .await
372            .context(TableMetadataManagerSnafu)?;
373        ensure!(
374            schema.is_some(),
375            SchemaNotFoundSnafu {
376                schema_info: &create_table.schema_name,
377            }
378        );
379
380        // if table exists.
381        if let Some(table) = self
382            .catalog_manager
383            .table(
384                &create_table.catalog_name,
385                &create_table.schema_name,
386                &create_table.table_name,
387                Some(&query_ctx),
388            )
389            .await
390            .context(CatalogSnafu)?
391        {
392            return if create_table.create_if_not_exists {
393                Ok(table)
394            } else {
395                TableAlreadyExistsSnafu {
396                    table: format_full_table_name(
397                        &create_table.catalog_name,
398                        &create_table.schema_name,
399                        &create_table.table_name,
400                    ),
401                }
402                .fail()
403            };
404        }
405
406        ensure!(
407            NAME_PATTERN_REG.is_match(&create_table.table_name),
408            InvalidTableNameSnafu {
409                table_name: &create_table.table_name,
410            }
411        );
412
413        let table_name = TableName::new(
414            &create_table.catalog_name,
415            &create_table.schema_name,
416            &create_table.table_name,
417        );
418
419        let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
420        let mut table_info = create_table_info(create_table, partition_cols)?;
421
422        let resp = self
423            .create_table_procedure(
424                create_table.clone(),
425                partitions,
426                table_info.clone(),
427                query_ctx,
428            )
429            .await?;
430
431        let table_id = resp
432            .table_ids
433            .into_iter()
434            .next()
435            .context(error::UnexpectedSnafu {
436                violated: "expected table_id",
437            })?;
438        info!("Successfully created table '{table_name}' with table id {table_id}");
439
440        table_info.ident.table_id = table_id;
441
442        let table_info = Arc::new(table_info);
443        create_table.table_id = Some(api::v1::TableId { id: table_id });
444
445        let table = DistTable::table(table_info);
446
447        Ok(table)
448    }
449
450    #[tracing::instrument(skip_all)]
451    pub async fn create_logical_tables(
452        &self,
453        create_table_exprs: &[CreateTableExpr],
454        query_context: QueryContextRef,
455    ) -> Result<Vec<TableRef>> {
456        let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer();
457        ensure!(
458            !create_table_exprs.is_empty(),
459            EmptyDdlExprSnafu {
460                name: "create logic tables"
461            }
462        );
463
464        // Check table names
465        for create_table in create_table_exprs {
466            ensure!(
467                NAME_PATTERN_REG.is_match(&create_table.table_name),
468                InvalidTableNameSnafu {
469                    table_name: &create_table.table_name,
470                }
471            );
472        }
473
474        let raw_tables_info = create_table_exprs
475            .iter()
476            .map(|create| create_table_info(create, vec![]))
477            .collect::<Result<Vec<_>>>()?;
478        let tables_data = create_table_exprs
479            .iter()
480            .cloned()
481            .zip(raw_tables_info.iter().cloned())
482            .collect::<Vec<_>>();
483
484        let resp = self
485            .create_logical_tables_procedure(tables_data, query_context.clone())
486            .await?;
487
488        let table_ids = resp.table_ids;
489        ensure!(
490            table_ids.len() == raw_tables_info.len(),
491            CreateLogicalTablesSnafu {
492                reason: format!(
493                    "The number of tables is inconsistent with the expected number to be created, expected: {}, actual: {}",
494                    raw_tables_info.len(),
495                    table_ids.len()
496                )
497            }
498        );
499        info!("Successfully created logical tables: {:?}", table_ids);
500
501        // Reacquire table infos from catalog so logical tables inherit the latest partition
502        // metadata (e.g. partition_key_indices) from their physical tables.
503        // And the returned table info also included extra partition columns that are in physical table but not in logical table's create table expr
504        let mut tables_info = Vec::with_capacity(table_ids.len());
505        for (table_id, create_table) in table_ids.iter().zip(create_table_exprs.iter()) {
506            let table = self
507                .catalog_manager
508                .table(
509                    &create_table.catalog_name,
510                    &create_table.schema_name,
511                    &create_table.table_name,
512                    Some(&query_context),
513                )
514                .await
515                .context(CatalogSnafu)?
516                .with_context(|| TableNotFoundSnafu {
517                    table_name: format_full_table_name(
518                        &create_table.catalog_name,
519                        &create_table.schema_name,
520                        &create_table.table_name,
521                    ),
522                })?;
523
524            let table_info = table.table_info();
525            // Safety check: ensure we are returning the table info that matches the newly created table id.
526            ensure!(
527                table_info.table_id() == *table_id,
528                CreateLogicalTablesSnafu {
529                    reason: format!(
530                        "Table id mismatch after creation, expected {}, got {} for table {}",
531                        table_id,
532                        table_info.table_id(),
533                        format_full_table_name(
534                            &create_table.catalog_name,
535                            &create_table.schema_name,
536                            &create_table.table_name
537                        )
538                    )
539                }
540            );
541
542            tables_info.push(table_info);
543        }
544
545        Ok(tables_info.into_iter().map(DistTable::table).collect())
546    }
547
548    async fn validate_logical_table_partition_rule(
549        &self,
550        create_table: &CreateTableExpr,
551        partitions: &Partitions,
552        query_ctx: &QueryContextRef,
553    ) -> Result<()> {
554        let (_, mut logical_partition_exprs) =
555            parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
556
557        let physical_table_name = create_table
558            .table_options
559            .get(LOGICAL_TABLE_METADATA_KEY)
560            .with_context(|| CreateLogicalTablesSnafu {
561                reason: format!(
562                    "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
563                ),
564            })?;
565
566        let physical_table = self
567            .catalog_manager
568            .table(
569                &create_table.catalog_name,
570                &create_table.schema_name,
571                physical_table_name,
572                Some(query_ctx),
573            )
574            .await
575            .context(CatalogSnafu)?
576            .context(TableNotFoundSnafu {
577                table_name: physical_table_name.clone(),
578            })?;
579
580        let physical_table_info = physical_table.table_info();
581        let (partition_rule, _) = self
582            .partition_manager
583            .find_table_partition_rule(&physical_table_info)
584            .await
585            .context(error::FindTablePartitionRuleSnafu {
586                table_name: physical_table_name.clone(),
587            })?;
588
589        let multi_dim_rule = partition_rule
590            .as_ref()
591            .as_any()
592            .downcast_ref::<MultiDimPartitionRule>()
593            .context(InvalidPartitionRuleSnafu {
594                reason: "physical table partition rule is not range-based",
595            })?;
596
597        // TODO(ruihang): project physical partition exprs to logical partition column
598        let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
599        logical_partition_exprs.sort_unstable();
600        physical_partition_exprs.sort_unstable();
601
602        ensure!(
603            physical_partition_exprs == logical_partition_exprs,
604            InvalidPartitionRuleSnafu {
605                reason: format!(
606                    "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
607                    logical_partition_exprs, physical_partition_exprs
608                ),
609            }
610        );
611
612        Ok(())
613    }
614
615    #[cfg(feature = "enterprise")]
616    #[tracing::instrument(skip_all)]
617    pub async fn create_trigger(
618        &self,
619        stmt: CreateTrigger,
620        query_context: QueryContextRef,
621    ) -> Result<Output> {
622        let expr = expr_helper::to_create_trigger_task_expr(stmt, &query_context)?;
623        self.create_trigger_inner(expr, query_context).await
624    }
625
626    #[cfg(feature = "enterprise")]
627    pub async fn create_trigger_inner(
628        &self,
629        expr: PbCreateTriggerExpr,
630        query_context: QueryContextRef,
631    ) -> Result<Output> {
632        self.create_trigger_procedure(expr, query_context).await?;
633        Ok(Output::new_with_affected_rows(0))
634    }
635
636    #[cfg(feature = "enterprise")]
637    async fn create_trigger_procedure(
638        &self,
639        expr: PbCreateTriggerExpr,
640        query_context: QueryContextRef,
641    ) -> Result<SubmitDdlTaskResponse> {
642        let task = CreateTriggerTask::try_from(PbCreateTriggerTask {
643            create_trigger: Some(expr),
644        })
645        .context(error::InvalidExprSnafu)?;
646
647        let request = SubmitDdlTaskRequest::new(
648            to_meta_query_context(query_context),
649            DdlTask::new_create_trigger(task),
650        );
651
652        self.procedure_executor
653            .submit_ddl_task(&ExecutorContext::default(), request)
654            .await
655            .context(error::ExecuteDdlSnafu)
656    }
657
658    #[tracing::instrument(skip_all)]
659    pub async fn create_flow(
660        &self,
661        stmt: CreateFlow,
662        query_context: QueryContextRef,
663    ) -> Result<Output> {
664        // TODO(ruihang): do some verification
665        let expr = expr_helper::to_create_flow_task_expr(stmt, &query_context)?;
666
667        self.create_flow_inner(expr, query_context).await
668    }
669
670    pub async fn create_flow_inner(
671        &self,
672        expr: CreateFlowExpr,
673        query_context: QueryContextRef,
674    ) -> Result<Output> {
675        self.create_flow_procedure(expr, query_context).await?;
676        Ok(Output::new_with_affected_rows(0))
677    }
678
679    async fn create_flow_procedure(
680        &self,
681        mut expr: CreateFlowExpr,
682        query_context: QueryContextRef,
683    ) -> Result<SubmitDdlTaskResponse> {
684        expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;
685
686        let flow_type = self
687            .determine_flow_type(&expr, query_context.clone())
688            .await?;
689        info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
690
691        expr.flow_options
692            .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
693
694        let task = CreateFlowTask::try_from(PbCreateFlowTask {
695            create_flow: Some(expr),
696        })
697        .context(error::InvalidExprSnafu)?;
698        let request = SubmitDdlTaskRequest::new(
699            to_meta_query_context(query_context),
700            DdlTask::new_create_flow(task),
701        );
702
703        self.procedure_executor
704            .submit_ddl_task(&ExecutorContext::default(), request)
705            .await
706            .context(error::ExecuteDdlSnafu)
707    }
708
709    /// Determine the flow type based on the SQL query
710    ///
711    /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
712    async fn determine_flow_type(
713        &self,
714        expr: &CreateFlowExpr,
715        query_ctx: QueryContextRef,
716    ) -> Result<FlowType> {
717        // first check if source table's ttl is instant, if it is, force streaming mode
718        for src_table_name in &expr.source_table_names {
719            let table = self
720                .catalog_manager()
721                .table(
722                    &src_table_name.catalog_name,
723                    &src_table_name.schema_name,
724                    &src_table_name.table_name,
725                    Some(&query_ctx),
726                )
727                .await
728                .map_err(BoxedError::new)
729                .context(ExternalSnafu)?
730                .with_context(|| TableNotFoundSnafu {
731                    table_name: format_full_table_name(
732                        &src_table_name.catalog_name,
733                        &src_table_name.schema_name,
734                        &src_table_name.table_name,
735                    ),
736                })?;
737
738            // instant source table can only be handled by streaming mode
739            if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
740                warn!(
741                    "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode",
742                    format_full_table_name(
743                        &src_table_name.catalog_name,
744                        &src_table_name.schema_name,
745                        &src_table_name.table_name
746                    ),
747                    expr.flow_name
748                );
749                return Ok(FlowType::Streaming);
750            }
751        }
752
753        let engine = &self.query_engine;
754        let stmts = ParserContext::create_with_dialect(
755            &expr.sql,
756            query_ctx.sql_dialect(),
757            ParseOptions::default(),
758        )
759        .map_err(BoxedError::new)
760        .context(ExternalSnafu)?;
761
762        ensure!(
763            stmts.len() == 1,
764            InvalidSqlSnafu {
765                err_msg: format!("Expect only one statement, found {}", stmts.len())
766            }
767        );
768        let stmt = &stmts[0];
769
770        if is_tql(query_ctx.sql_dialect(), &expr.sql)
771            .map_err(BoxedError::new)
772            .context(ExternalSnafu)?
773        {
774            return Ok(FlowType::Batching);
775        }
776
777        // support tql parse too
778        let plan = match stmt {
779            // prom ql is only supported in batching mode
780            Statement::Tql(_) => return Ok(FlowType::Batching),
781            _ => engine
782                .planner()
783                .plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
784                .await
785                .map_err(BoxedError::new)
786                .context(ExternalSnafu)?,
787        };
788
789        /// Visitor to find aggregation or distinct
790        struct FindAggr {
791            is_aggr: bool,
792        }
793
794        impl TreeNodeVisitor<'_> for FindAggr {
795            type Node = LogicalPlan;
796            fn f_down(
797                &mut self,
798                node: &Self::Node,
799            ) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
800            {
801                match node {
802                    LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
803                        self.is_aggr = true;
804                        return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
805                    }
806                    _ => (),
807                }
808                Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
809            }
810        }
811
812        let mut find_aggr = FindAggr { is_aggr: false };
813
814        plan.visit_with_subqueries(&mut find_aggr)
815            .context(BuildDfLogicalPlanSnafu)?;
816        if find_aggr.is_aggr {
817            Ok(FlowType::Batching)
818        } else {
819            Ok(FlowType::Streaming)
820        }
821    }
822
823    #[tracing::instrument(skip_all)]
824    pub async fn create_view(
825        &self,
826        create_view: CreateView,
827        ctx: QueryContextRef,
828    ) -> Result<TableRef> {
829        // convert input into logical plan
830        let logical_plan = match &*create_view.query {
831            Statement::Query(query) => {
832                self.plan(
833                    &QueryStatement::Sql(Statement::Query(query.clone())),
834                    ctx.clone(),
835                )
836                .await?
837            }
838            Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
839            _ => {
840                return InvalidViewStmtSnafu {}.fail();
841            }
842        };
843        // Save the definition for `show create view`.
844        let definition = create_view.to_string();
845
846        // Save the columns in plan, it may changed when the schemas of tables in plan
847        // are altered.
848        let schema: Schema = logical_plan
849            .schema()
850            .clone()
851            .try_into()
852            .context(ConvertSchemaSnafu)?;
853        let plan_columns: Vec<_> = schema
854            .column_schemas()
855            .iter()
856            .map(|c| c.name.clone())
857            .collect();
858
859        let columns: Vec<_> = create_view
860            .columns
861            .iter()
862            .map(|ident| ident.to_string())
863            .collect();
864
865        // Validate columns
866        if !columns.is_empty() {
867            ensure!(
868                columns.len() == plan_columns.len(),
869                error::ViewColumnsMismatchSnafu {
870                    view_name: create_view.name.to_string(),
871                    expected: plan_columns.len(),
872                    actual: columns.len(),
873                }
874            );
875        }
876
877        // Extract the table names from the original plan
878        // and rewrite them as fully qualified names.
879        let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
880            .context(ExtractTableNamesSnafu)?;
881
882        let table_names = table_names.into_iter().map(|t| t.into()).collect();
883
884        // TODO(dennis): we don't save the optimized plan yet,
885        // because there are some serialization issue with our own defined plan node (such as `MergeScanLogicalPlan`).
886        // When the issues are fixed, we can use the `optimized_plan` instead.
887        // let optimized_plan = self.optimize_logical_plan(logical_plan)?.unwrap_df_plan();
888
889        // encode logical plan
890        let encoded_plan = DFLogicalSubstraitConvertor
891            .encode(&plan, DefaultSerializer)
892            .context(SubstraitCodecSnafu)?;
893
894        let expr = expr_helper::to_create_view_expr(
895            create_view,
896            encoded_plan.to_vec(),
897            table_names,
898            columns,
899            plan_columns,
900            definition,
901            ctx.clone(),
902        )?;
903
904        // TODO(dennis): validate the logical plan
905        self.create_view_by_expr(expr, ctx).await
906    }
907
908    pub async fn create_view_by_expr(
909        &self,
910        expr: CreateViewExpr,
911        ctx: QueryContextRef,
912    ) -> Result<TableRef> {
913        ensure! {
914            !(expr.create_if_not_exists & expr.or_replace),
915            InvalidSqlSnafu {
916                err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
917            }
918        };
919        let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
920
921        let schema_exists = self
922            .table_metadata_manager
923            .schema_manager()
924            .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
925            .await
926            .context(TableMetadataManagerSnafu)?;
927
928        ensure!(
929            schema_exists,
930            SchemaNotFoundSnafu {
931                schema_info: &expr.schema_name,
932            }
933        );
934
935        // if view or table exists.
936        if let Some(table) = self
937            .catalog_manager
938            .table(
939                &expr.catalog_name,
940                &expr.schema_name,
941                &expr.view_name,
942                Some(&ctx),
943            )
944            .await
945            .context(CatalogSnafu)?
946        {
947            let table_type = table.table_info().table_type;
948
949            match (table_type, expr.create_if_not_exists, expr.or_replace) {
950                (TableType::View, true, false) => {
951                    return Ok(table);
952                }
953                (TableType::View, false, false) => {
954                    return ViewAlreadyExistsSnafu {
955                        name: format_full_table_name(
956                            &expr.catalog_name,
957                            &expr.schema_name,
958                            &expr.view_name,
959                        ),
960                    }
961                    .fail();
962                }
963                (TableType::View, _, true) => {
964                    // Try to replace an exists view
965                }
966                _ => {
967                    return TableAlreadyExistsSnafu {
968                        table: format_full_table_name(
969                            &expr.catalog_name,
970                            &expr.schema_name,
971                            &expr.view_name,
972                        ),
973                    }
974                    .fail();
975                }
976            }
977        }
978
979        ensure!(
980            NAME_PATTERN_REG.is_match(&expr.view_name),
981            InvalidViewNameSnafu {
982                name: expr.view_name.clone(),
983            }
984        );
985
986        let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
987
988        let mut view_info = TableInfo {
989            ident: metadata::TableIdent {
990                // The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
991                table_id: 0,
992                version: 0,
993            },
994            name: expr.view_name.clone(),
995            desc: None,
996            catalog_name: expr.catalog_name.clone(),
997            schema_name: expr.schema_name.clone(),
998            // The meta doesn't make sense for views, so using a default one.
999            meta: TableMeta::empty(),
1000            table_type: TableType::View,
1001        };
1002
1003        let request = SubmitDdlTaskRequest::new(
1004            to_meta_query_context(ctx),
1005            DdlTask::new_create_view(expr, view_info.clone()),
1006        );
1007
1008        let resp = self
1009            .procedure_executor
1010            .submit_ddl_task(&ExecutorContext::default(), request)
1011            .await
1012            .context(error::ExecuteDdlSnafu)?;
1013
1014        debug!(
1015            "Submit creating view '{view_name}' task response: {:?}",
1016            resp
1017        );
1018
1019        let view_id = resp
1020            .table_ids
1021            .into_iter()
1022            .next()
1023            .context(error::UnexpectedSnafu {
1024                violated: "expected table_id",
1025            })?;
1026        info!("Successfully created view '{view_name}' with view id {view_id}");
1027
1028        view_info.ident.table_id = view_id;
1029
1030        let view_info = Arc::new(view_info);
1031
1032        let table = DistTable::table(view_info);
1033
1034        // Invalidates local cache ASAP.
1035        self.cache_invalidator
1036            .invalidate(
1037                &Context::default(),
1038                &[
1039                    CacheIdent::TableId(view_id),
1040                    CacheIdent::TableName(view_name.clone()),
1041                ],
1042            )
1043            .await
1044            .context(error::InvalidateTableCacheSnafu)?;
1045
1046        Ok(table)
1047    }
1048
1049    #[tracing::instrument(skip_all)]
1050    pub async fn drop_flow(
1051        &self,
1052        catalog_name: String,
1053        flow_name: String,
1054        drop_if_exists: bool,
1055        query_context: QueryContextRef,
1056    ) -> Result<Output> {
1057        if let Some(flow) = self
1058            .flow_metadata_manager
1059            .flow_name_manager()
1060            .get(&catalog_name, &flow_name)
1061            .await
1062            .context(error::TableMetadataManagerSnafu)?
1063        {
1064            let flow_id = flow.flow_id();
1065            let task = DropFlowTask {
1066                catalog_name,
1067                flow_name,
1068                flow_id,
1069                drop_if_exists,
1070            };
1071            self.drop_flow_procedure(task, query_context).await?;
1072
1073            Ok(Output::new_with_affected_rows(0))
1074        } else if drop_if_exists {
1075            Ok(Output::new_with_affected_rows(0))
1076        } else {
1077            FlowNotFoundSnafu {
1078                flow_name: format_full_flow_name(&catalog_name, &flow_name),
1079            }
1080            .fail()
1081        }
1082    }
1083
1084    async fn drop_flow_procedure(
1085        &self,
1086        expr: DropFlowTask,
1087        query_context: QueryContextRef,
1088    ) -> Result<SubmitDdlTaskResponse> {
1089        let request = SubmitDdlTaskRequest::new(
1090            to_meta_query_context(query_context),
1091            DdlTask::new_drop_flow(expr),
1092        );
1093
1094        self.procedure_executor
1095            .submit_ddl_task(&ExecutorContext::default(), request)
1096            .await
1097            .context(error::ExecuteDdlSnafu)
1098    }
1099
1100    #[cfg(feature = "enterprise")]
1101    #[tracing::instrument(skip_all)]
1102    pub(super) async fn drop_trigger(
1103        &self,
1104        catalog_name: String,
1105        trigger_name: String,
1106        drop_if_exists: bool,
1107        query_context: QueryContextRef,
1108    ) -> Result<Output> {
1109        let task = DropTriggerTask {
1110            catalog_name,
1111            trigger_name,
1112            drop_if_exists,
1113        };
1114        self.drop_trigger_procedure(task, query_context).await?;
1115        Ok(Output::new_with_affected_rows(0))
1116    }
1117
1118    #[cfg(feature = "enterprise")]
1119    async fn drop_trigger_procedure(
1120        &self,
1121        expr: DropTriggerTask,
1122        query_context: QueryContextRef,
1123    ) -> Result<SubmitDdlTaskResponse> {
1124        let request = SubmitDdlTaskRequest::new(
1125            to_meta_query_context(query_context),
1126            DdlTask::new_drop_trigger(expr),
1127        );
1128
1129        self.procedure_executor
1130            .submit_ddl_task(&ExecutorContext::default(), request)
1131            .await
1132            .context(error::ExecuteDdlSnafu)
1133    }
1134
1135    /// Drop a view
1136    #[tracing::instrument(skip_all)]
1137    pub(crate) async fn drop_view(
1138        &self,
1139        catalog: String,
1140        schema: String,
1141        view: String,
1142        drop_if_exists: bool,
1143        query_context: QueryContextRef,
1144    ) -> Result<Output> {
1145        let view_info = if let Some(view) = self
1146            .catalog_manager
1147            .table(&catalog, &schema, &view, None)
1148            .await
1149            .context(CatalogSnafu)?
1150        {
1151            view.table_info()
1152        } else if drop_if_exists {
1153            // DROP VIEW IF EXISTS meets view not found - ignored
1154            return Ok(Output::new_with_affected_rows(0));
1155        } else {
1156            return TableNotFoundSnafu {
1157                table_name: format_full_table_name(&catalog, &schema, &view),
1158            }
1159            .fail();
1160        };
1161
1162        // Ensure the exists one is view, we can't drop other table types
1163        ensure!(
1164            view_info.table_type == TableType::View,
1165            error::InvalidViewSnafu {
1166                msg: "not a view",
1167                view_name: format_full_table_name(&catalog, &schema, &view),
1168            }
1169        );
1170
1171        let view_id = view_info.table_id();
1172
1173        let task = DropViewTask {
1174            catalog,
1175            schema,
1176            view,
1177            view_id,
1178            drop_if_exists,
1179        };
1180
1181        self.drop_view_procedure(task, query_context).await?;
1182
1183        Ok(Output::new_with_affected_rows(0))
1184    }
1185
1186    /// Submit [DropViewTask] to procedure executor.
1187    async fn drop_view_procedure(
1188        &self,
1189        expr: DropViewTask,
1190        query_context: QueryContextRef,
1191    ) -> Result<SubmitDdlTaskResponse> {
1192        let request = SubmitDdlTaskRequest::new(
1193            to_meta_query_context(query_context),
1194            DdlTask::new_drop_view(expr),
1195        );
1196
1197        self.procedure_executor
1198            .submit_ddl_task(&ExecutorContext::default(), request)
1199            .await
1200            .context(error::ExecuteDdlSnafu)
1201    }
1202
1203    #[tracing::instrument(skip_all)]
1204    pub async fn alter_logical_tables(
1205        &self,
1206        alter_table_exprs: Vec<AlterTableExpr>,
1207        query_context: QueryContextRef,
1208    ) -> Result<Output> {
1209        let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
1210        ensure!(
1211            !alter_table_exprs.is_empty(),
1212            EmptyDdlExprSnafu {
1213                name: "alter logical tables"
1214            }
1215        );
1216
1217        // group by physical table id
1218        let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
1219        for expr in alter_table_exprs {
1220            // Get table_id from catalog_manager
1221            let catalog = if expr.catalog_name.is_empty() {
1222                query_context.current_catalog()
1223            } else {
1224                &expr.catalog_name
1225            };
1226            let schema = if expr.schema_name.is_empty() {
1227                query_context.current_schema()
1228            } else {
1229                expr.schema_name.clone()
1230            };
1231            let table_name = &expr.table_name;
1232            let table = self
1233                .catalog_manager
1234                .table(catalog, &schema, table_name, Some(&query_context))
1235                .await
1236                .context(CatalogSnafu)?
1237                .with_context(|| TableNotFoundSnafu {
1238                    table_name: format_full_table_name(catalog, &schema, table_name),
1239                })?;
1240            let table_id = table.table_info().ident.table_id;
1241            let physical_table_id = self
1242                .table_metadata_manager
1243                .table_route_manager()
1244                .get_physical_table_id(table_id)
1245                .await
1246                .context(TableMetadataManagerSnafu)?;
1247            groups.entry(physical_table_id).or_default().push(expr);
1248        }
1249
1250        // Submit procedure for each physical table
1251        let mut handles = Vec::with_capacity(groups.len());
1252        for (_physical_table_id, exprs) in groups {
1253            let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
1254            handles.push(fut);
1255        }
1256        let _results = futures::future::try_join_all(handles).await?;
1257
1258        Ok(Output::new_with_affected_rows(0))
1259    }
1260
1261    #[tracing::instrument(skip_all)]
1262    pub async fn drop_table(
1263        &self,
1264        table_name: TableName,
1265        drop_if_exists: bool,
1266        query_context: QueryContextRef,
1267    ) -> Result<Output> {
1268        // Reserved for grpc call
1269        self.drop_tables(&[table_name], drop_if_exists, query_context)
1270            .await
1271    }
1272
1273    #[tracing::instrument(skip_all)]
1274    pub async fn drop_tables(
1275        &self,
1276        table_names: &[TableName],
1277        drop_if_exists: bool,
1278        query_context: QueryContextRef,
1279    ) -> Result<Output> {
1280        let mut tables = Vec::with_capacity(table_names.len());
1281        for table_name in table_names {
1282            ensure!(
1283                !is_readonly_schema(&table_name.schema_name),
1284                SchemaReadOnlySnafu {
1285                    name: table_name.schema_name.clone()
1286                }
1287            );
1288
1289            if let Some(table) = self
1290                .catalog_manager
1291                .table(
1292                    &table_name.catalog_name,
1293                    &table_name.schema_name,
1294                    &table_name.table_name,
1295                    Some(&query_context),
1296                )
1297                .await
1298                .context(CatalogSnafu)?
1299            {
1300                tables.push(table.table_info().table_id());
1301            } else if drop_if_exists {
1302                // DROP TABLE IF EXISTS meets table not found - ignored
1303                continue;
1304            } else {
1305                return TableNotFoundSnafu {
1306                    table_name: table_name.to_string(),
1307                }
1308                .fail();
1309            }
1310        }
1311
1312        for (table_name, table_id) in table_names.iter().zip(tables.into_iter()) {
1313            self.drop_table_procedure(table_name, table_id, drop_if_exists, query_context.clone())
1314                .await?;
1315
1316            // Invalidates local cache ASAP.
1317            self.cache_invalidator
1318                .invalidate(
1319                    &Context::default(),
1320                    &[
1321                        CacheIdent::TableId(table_id),
1322                        CacheIdent::TableName(table_name.clone()),
1323                    ],
1324                )
1325                .await
1326                .context(error::InvalidateTableCacheSnafu)?;
1327        }
1328        Ok(Output::new_with_affected_rows(0))
1329    }
1330
1331    #[tracing::instrument(skip_all)]
1332    pub async fn drop_database(
1333        &self,
1334        catalog: String,
1335        schema: String,
1336        drop_if_exists: bool,
1337        query_context: QueryContextRef,
1338    ) -> Result<Output> {
1339        ensure!(
1340            !is_readonly_schema(&schema),
1341            SchemaReadOnlySnafu { name: schema }
1342        );
1343
1344        if self
1345            .catalog_manager
1346            .schema_exists(&catalog, &schema, None)
1347            .await
1348            .context(CatalogSnafu)?
1349        {
1350            if schema == query_context.current_schema() {
1351                SchemaInUseSnafu { name: schema }.fail()
1352            } else {
1353                self.drop_database_procedure(catalog, schema, drop_if_exists, query_context)
1354                    .await?;
1355
1356                Ok(Output::new_with_affected_rows(0))
1357            }
1358        } else if drop_if_exists {
1359            // DROP TABLE IF EXISTS meets table not found - ignored
1360            Ok(Output::new_with_affected_rows(0))
1361        } else {
1362            SchemaNotFoundSnafu {
1363                schema_info: schema,
1364            }
1365            .fail()
1366        }
1367    }
1368
1369    #[tracing::instrument(skip_all)]
1370    pub async fn truncate_table(
1371        &self,
1372        table_name: TableName,
1373        time_ranges: Vec<(Timestamp, Timestamp)>,
1374        query_context: QueryContextRef,
1375    ) -> Result<Output> {
1376        ensure!(
1377            !is_readonly_schema(&table_name.schema_name),
1378            SchemaReadOnlySnafu {
1379                name: table_name.schema_name.clone()
1380            }
1381        );
1382
1383        let table = self
1384            .catalog_manager
1385            .table(
1386                &table_name.catalog_name,
1387                &table_name.schema_name,
1388                &table_name.table_name,
1389                Some(&query_context),
1390            )
1391            .await
1392            .context(CatalogSnafu)?
1393            .with_context(|| TableNotFoundSnafu {
1394                table_name: table_name.to_string(),
1395            })?;
1396        let table_id = table.table_info().table_id();
1397        self.truncate_table_procedure(&table_name, table_id, time_ranges, query_context)
1398            .await?;
1399
1400        Ok(Output::new_with_affected_rows(0))
1401    }
1402
1403    #[tracing::instrument(skip_all)]
1404    pub async fn alter_table(
1405        &self,
1406        alter_table: AlterTable,
1407        query_context: QueryContextRef,
1408    ) -> Result<Output> {
1409        if matches!(
1410            alter_table.alter_operation(),
1411            AlterTableOperation::Repartition { .. }
1412        ) {
1413            let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
1414            return self.repartition_table(request, &query_context).await;
1415        }
1416
1417        let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
1418        self.alter_table_inner(expr, query_context).await
1419    }
1420
1421    #[tracing::instrument(skip_all)]
1422    pub async fn repartition_table(
1423        &self,
1424        request: RepartitionRequest,
1425        query_context: &QueryContextRef,
1426    ) -> Result<Output> {
1427        // Check if the schema is read-only.
1428        ensure!(
1429            !is_readonly_schema(&request.schema_name),
1430            SchemaReadOnlySnafu {
1431                name: request.schema_name.clone()
1432            }
1433        );
1434
1435        let table_ref = TableReference::full(
1436            &request.catalog_name,
1437            &request.schema_name,
1438            &request.table_name,
1439        );
1440        // Get the table from the catalog.
1441        let table = self
1442            .catalog_manager
1443            .table(
1444                &request.catalog_name,
1445                &request.schema_name,
1446                &request.table_name,
1447                Some(query_context),
1448            )
1449            .await
1450            .context(CatalogSnafu)?
1451            .with_context(|| TableNotFoundSnafu {
1452                table_name: table_ref.to_string(),
1453            })?;
1454        let table_id = table.table_info().ident.table_id;
1455        // Get existing partition expressions from the table route.
1456        let (physical_table_id, physical_table_route) = self
1457            .table_metadata_manager
1458            .table_route_manager()
1459            .get_physical_table_route(table_id)
1460            .await
1461            .context(TableMetadataManagerSnafu)?;
1462
1463        ensure!(
1464            physical_table_id == table_id,
1465            NotSupportedSnafu {
1466                feat: "REPARTITION on logical tables"
1467            }
1468        );
1469
1470        let table_info = table.table_info();
1471        // Get partition column names from the table metadata.
1472        let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
1473        // Repartition requires the table to have partition columns.
1474        ensure!(
1475            !existing_partition_columns.is_empty(),
1476            InvalidPartitionRuleSnafu {
1477                reason: format!(
1478                    "table {} does not have partition columns, cannot repartition",
1479                    table_ref
1480                )
1481            }
1482        );
1483
1484        // Repartition operations involving columns outside the existing partition columns are not supported.
1485        // This restriction ensures repartition only applies to current partition columns.
1486        let column_name_and_type = existing_partition_columns
1487            .iter()
1488            .map(|column| (&column.name, column.data_type.clone()))
1489            .collect();
1490        let timezone = query_context.timezone();
1491        // Convert SQL Exprs to PartitionExprs.
1492        let from_partition_exprs = request
1493            .from_exprs
1494            .iter()
1495            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1496            .collect::<Result<Vec<_>>>()?;
1497
1498        let mut into_partition_exprs = request
1499            .into_exprs
1500            .iter()
1501            .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
1502            .collect::<Result<Vec<_>>>()?;
1503
1504        // `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from
1505        // multiple source partitions; try to simplify it for better readability and stability.
1506        if from_partition_exprs.len() > 1
1507            && into_partition_exprs.len() == 1
1508            && let Some(expr) = into_partition_exprs.pop()
1509        {
1510            into_partition_exprs.push(partition::simplify::simplify_merged_partition_expr(expr));
1511        }
1512
1513        // Parse existing partition expressions from region routes.
1514        let mut existing_partition_exprs =
1515            Vec::with_capacity(physical_table_route.region_routes.len());
1516        for route in &physical_table_route.region_routes {
1517            let expr_json = route.region.partition_expr();
1518            if !expr_json.is_empty() {
1519                match PartitionExpr::from_json_str(&expr_json) {
1520                    Ok(Some(expr)) => existing_partition_exprs.push(expr),
1521                    Ok(None) => {
1522                        // Empty
1523                    }
1524                    Err(e) => {
1525                        return Err(e).context(DeserializePartitionExprSnafu);
1526                    }
1527                }
1528            }
1529        }
1530
1531        // Validate that from_partition_exprs are a subset of existing partition exprs.
1532        // We compare PartitionExpr directly since it implements Eq.
1533        for from_expr in &from_partition_exprs {
1534            ensure!(
1535                existing_partition_exprs.contains(from_expr),
1536                InvalidPartitionRuleSnafu {
1537                    reason: format!(
1538                        "partition expression '{}' does not exist in table {}",
1539                        from_expr, table_ref
1540                    )
1541                }
1542            );
1543        }
1544
1545        // Build the new partition expressions:
1546        // new_exprs = existing_exprs - from_exprs + into_exprs
1547        let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
1548            .into_iter()
1549            .filter(|expr| !from_partition_exprs.contains(expr))
1550            .chain(into_partition_exprs.clone().into_iter())
1551            .collect();
1552        let new_partition_exprs_len = new_partition_exprs.len();
1553        let from_partition_exprs_len = from_partition_exprs.len();
1554
1555        // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
1556        let _ = MultiDimPartitionRule::try_new(
1557            existing_partition_columns
1558                .iter()
1559                .map(|c| c.name.clone())
1560                .collect(),
1561            vec![],
1562            new_partition_exprs,
1563            true,
1564        )
1565        .context(InvalidPartitionSnafu)?;
1566
1567        let ddl_options = parse_ddl_options(&request.options)?;
1568        let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
1569            let mut json_exprs = Vec::with_capacity(exprs.len());
1570            for expr in exprs {
1571                json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
1572            }
1573            Ok(json_exprs)
1574        };
1575        let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
1576        let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
1577        let mut req = SubmitDdlTaskRequest::new(
1578            to_meta_query_context(query_context.clone()),
1579            DdlTask::new_alter_table(AlterTableExpr {
1580                catalog_name: request.catalog_name.clone(),
1581                schema_name: request.schema_name.clone(),
1582                table_name: request.table_name.clone(),
1583                kind: Some(Kind::Repartition(Repartition {
1584                    from_partition_exprs: from_partition_exprs_json,
1585                    into_partition_exprs: into_partition_exprs_json,
1586                })),
1587            }),
1588        );
1589        req.wait = ddl_options.wait;
1590        req.timeout = ddl_options.timeout;
1591
1592        info!(
1593            "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
1594            table_ref,
1595            table_id,
1596            from_partition_exprs_len,
1597            new_partition_exprs_len,
1598            ddl_options.timeout,
1599            ddl_options.wait
1600        );
1601
1602        let response = self
1603            .procedure_executor
1604            .submit_ddl_task(&ExecutorContext::default(), req)
1605            .await
1606            .context(error::ExecuteDdlSnafu)?;
1607
1608        if !ddl_options.wait {
1609            return build_procedure_id_output(response.key);
1610        }
1611
1612        // Only invalidate cache if wait is true.
1613        let invalidate_keys = vec![
1614            CacheIdent::TableId(table_id),
1615            CacheIdent::TableName(TableName::new(
1616                request.catalog_name,
1617                request.schema_name,
1618                request.table_name,
1619            )),
1620        ];
1621
1622        // Invalidates local cache ASAP.
1623        self.cache_invalidator
1624            .invalidate(&Context::default(), &invalidate_keys)
1625            .await
1626            .context(error::InvalidateTableCacheSnafu)?;
1627
1628        Ok(Output::new_with_affected_rows(0))
1629    }
1630
1631    #[tracing::instrument(skip_all)]
1632    pub async fn alter_table_inner(
1633        &self,
1634        expr: AlterTableExpr,
1635        query_context: QueryContextRef,
1636    ) -> Result<Output> {
1637        ensure!(
1638            !is_readonly_schema(&expr.schema_name),
1639            SchemaReadOnlySnafu {
1640                name: expr.schema_name.clone()
1641            }
1642        );
1643
1644        let catalog_name = if expr.catalog_name.is_empty() {
1645            DEFAULT_CATALOG_NAME.to_string()
1646        } else {
1647            expr.catalog_name.clone()
1648        };
1649
1650        let schema_name = if expr.schema_name.is_empty() {
1651            DEFAULT_SCHEMA_NAME.to_string()
1652        } else {
1653            expr.schema_name.clone()
1654        };
1655
1656        let table_name = expr.table_name.clone();
1657
1658        let table = self
1659            .catalog_manager
1660            .table(
1661                &catalog_name,
1662                &schema_name,
1663                &table_name,
1664                Some(&query_context),
1665            )
1666            .await
1667            .context(CatalogSnafu)?
1668            .with_context(|| TableNotFoundSnafu {
1669                table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
1670            })?;
1671
1672        let table_id = table.table_info().ident.table_id;
1673        let need_alter = verify_alter(table_id, table.table_info(), expr.clone())?;
1674        if !need_alter {
1675            return Ok(Output::new_with_affected_rows(0));
1676        }
1677        info!(
1678            "Table info before alter is {:?}, expr: {:?}",
1679            table.table_info(),
1680            expr
1681        );
1682
1683        let physical_table_id = self
1684            .table_metadata_manager
1685            .table_route_manager()
1686            .get_physical_table_id(table_id)
1687            .await
1688            .context(TableMetadataManagerSnafu)?;
1689
1690        let (req, invalidate_keys) = if physical_table_id == table_id {
1691            // This is physical table
1692            let req = SubmitDdlTaskRequest::new(
1693                to_meta_query_context(query_context),
1694                DdlTask::new_alter_table(expr),
1695            );
1696
1697            let invalidate_keys = vec![
1698                CacheIdent::TableId(table_id),
1699                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1700            ];
1701
1702            (req, invalidate_keys)
1703        } else {
1704            // This is logical table
1705            let req = SubmitDdlTaskRequest::new(
1706                to_meta_query_context(query_context),
1707                DdlTask::new_alter_logical_tables(vec![expr]),
1708            );
1709
1710            let mut invalidate_keys = vec![
1711                CacheIdent::TableId(physical_table_id),
1712                CacheIdent::TableId(table_id),
1713                CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
1714            ];
1715
1716            let physical_table = self
1717                .table_metadata_manager
1718                .table_info_manager()
1719                .get(physical_table_id)
1720                .await
1721                .context(TableMetadataManagerSnafu)?
1722                .map(|x| x.into_inner());
1723            if let Some(physical_table) = physical_table {
1724                let physical_table_name = TableName::new(
1725                    physical_table.table_info.catalog_name,
1726                    physical_table.table_info.schema_name,
1727                    physical_table.table_info.name,
1728                );
1729                invalidate_keys.push(CacheIdent::TableName(physical_table_name));
1730            }
1731
1732            (req, invalidate_keys)
1733        };
1734
1735        self.procedure_executor
1736            .submit_ddl_task(&ExecutorContext::default(), req)
1737            .await
1738            .context(error::ExecuteDdlSnafu)?;
1739
1740        // Invalidates local cache ASAP.
1741        self.cache_invalidator
1742            .invalidate(&Context::default(), &invalidate_keys)
1743            .await
1744            .context(error::InvalidateTableCacheSnafu)?;
1745
1746        Ok(Output::new_with_affected_rows(0))
1747    }
1748
1749    #[cfg(feature = "enterprise")]
1750    #[tracing::instrument(skip_all)]
1751    pub async fn alter_trigger(
1752        &self,
1753        _alter_expr: AlterTrigger,
1754        _query_context: QueryContextRef,
1755    ) -> Result<Output> {
1756        crate::error::NotSupportedSnafu {
1757            feat: "alter trigger",
1758        }
1759        .fail()
1760    }
1761
1762    #[tracing::instrument(skip_all)]
1763    pub async fn alter_database(
1764        &self,
1765        alter_expr: AlterDatabase,
1766        query_context: QueryContextRef,
1767    ) -> Result<Output> {
1768        let alter_expr = expr_helper::to_alter_database_expr(alter_expr, &query_context)?;
1769        self.alter_database_inner(alter_expr, query_context).await
1770    }
1771
1772    #[tracing::instrument(skip_all)]
1773    pub async fn alter_database_inner(
1774        &self,
1775        alter_expr: AlterDatabaseExpr,
1776        query_context: QueryContextRef,
1777    ) -> Result<Output> {
1778        ensure!(
1779            !is_readonly_schema(&alter_expr.schema_name),
1780            SchemaReadOnlySnafu {
1781                name: query_context.current_schema().clone()
1782            }
1783        );
1784
1785        let exists = self
1786            .catalog_manager
1787            .schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
1788            .await
1789            .context(CatalogSnafu)?;
1790        ensure!(
1791            exists,
1792            SchemaNotFoundSnafu {
1793                schema_info: alter_expr.schema_name,
1794            }
1795        );
1796
1797        let cache_ident = [CacheIdent::SchemaName(SchemaName {
1798            catalog_name: alter_expr.catalog_name.clone(),
1799            schema_name: alter_expr.schema_name.clone(),
1800        })];
1801
1802        self.alter_database_procedure(alter_expr, query_context)
1803            .await?;
1804
1805        // Invalidates local cache ASAP.
1806        self.cache_invalidator
1807            .invalidate(&Context::default(), &cache_ident)
1808            .await
1809            .context(error::InvalidateTableCacheSnafu)?;
1810
1811        Ok(Output::new_with_affected_rows(0))
1812    }
1813
1814    async fn create_table_procedure(
1815        &self,
1816        create_table: CreateTableExpr,
1817        partitions: Vec<PartitionExpr>,
1818        table_info: TableInfo,
1819        query_context: QueryContextRef,
1820    ) -> Result<SubmitDdlTaskResponse> {
1821        let partitions = partitions
1822            .into_iter()
1823            .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
1824            .collect::<Result<Vec<_>>>()?;
1825
1826        let request = SubmitDdlTaskRequest::new(
1827            to_meta_query_context(query_context),
1828            DdlTask::new_create_table(create_table, partitions, table_info),
1829        );
1830
1831        self.procedure_executor
1832            .submit_ddl_task(&ExecutorContext::default(), request)
1833            .await
1834            .context(error::ExecuteDdlSnafu)
1835    }
1836
1837    async fn create_logical_tables_procedure(
1838        &self,
1839        tables_data: Vec<(CreateTableExpr, TableInfo)>,
1840        query_context: QueryContextRef,
1841    ) -> Result<SubmitDdlTaskResponse> {
1842        let request = SubmitDdlTaskRequest::new(
1843            to_meta_query_context(query_context),
1844            DdlTask::new_create_logical_tables(tables_data),
1845        );
1846
1847        self.procedure_executor
1848            .submit_ddl_task(&ExecutorContext::default(), request)
1849            .await
1850            .context(error::ExecuteDdlSnafu)
1851    }
1852
1853    async fn alter_logical_tables_procedure(
1854        &self,
1855        tables_data: Vec<AlterTableExpr>,
1856        query_context: QueryContextRef,
1857    ) -> Result<SubmitDdlTaskResponse> {
1858        let request = SubmitDdlTaskRequest::new(
1859            to_meta_query_context(query_context),
1860            DdlTask::new_alter_logical_tables(tables_data),
1861        );
1862
1863        self.procedure_executor
1864            .submit_ddl_task(&ExecutorContext::default(), request)
1865            .await
1866            .context(error::ExecuteDdlSnafu)
1867    }
1868
1869    async fn drop_table_procedure(
1870        &self,
1871        table_name: &TableName,
1872        table_id: TableId,
1873        drop_if_exists: bool,
1874        query_context: QueryContextRef,
1875    ) -> Result<SubmitDdlTaskResponse> {
1876        let request = SubmitDdlTaskRequest::new(
1877            to_meta_query_context(query_context),
1878            DdlTask::new_drop_table(
1879                table_name.catalog_name.clone(),
1880                table_name.schema_name.clone(),
1881                table_name.table_name.clone(),
1882                table_id,
1883                drop_if_exists,
1884            ),
1885        );
1886
1887        self.procedure_executor
1888            .submit_ddl_task(&ExecutorContext::default(), request)
1889            .await
1890            .context(error::ExecuteDdlSnafu)
1891    }
1892
1893    async fn drop_database_procedure(
1894        &self,
1895        catalog: String,
1896        schema: String,
1897        drop_if_exists: bool,
1898        query_context: QueryContextRef,
1899    ) -> Result<SubmitDdlTaskResponse> {
1900        let request = SubmitDdlTaskRequest::new(
1901            to_meta_query_context(query_context),
1902            DdlTask::new_drop_database(catalog, schema, drop_if_exists),
1903        );
1904
1905        self.procedure_executor
1906            .submit_ddl_task(&ExecutorContext::default(), request)
1907            .await
1908            .context(error::ExecuteDdlSnafu)
1909    }
1910
1911    async fn alter_database_procedure(
1912        &self,
1913        alter_expr: AlterDatabaseExpr,
1914        query_context: QueryContextRef,
1915    ) -> Result<SubmitDdlTaskResponse> {
1916        let request = SubmitDdlTaskRequest::new(
1917            to_meta_query_context(query_context),
1918            DdlTask::new_alter_database(alter_expr),
1919        );
1920
1921        self.procedure_executor
1922            .submit_ddl_task(&ExecutorContext::default(), request)
1923            .await
1924            .context(error::ExecuteDdlSnafu)
1925    }
1926
1927    async fn truncate_table_procedure(
1928        &self,
1929        table_name: &TableName,
1930        table_id: TableId,
1931        time_ranges: Vec<(Timestamp, Timestamp)>,
1932        query_context: QueryContextRef,
1933    ) -> Result<SubmitDdlTaskResponse> {
1934        let request = SubmitDdlTaskRequest::new(
1935            to_meta_query_context(query_context),
1936            DdlTask::new_truncate_table(
1937                table_name.catalog_name.clone(),
1938                table_name.schema_name.clone(),
1939                table_name.table_name.clone(),
1940                table_id,
1941                time_ranges,
1942            ),
1943        );
1944
1945        self.procedure_executor
1946            .submit_ddl_task(&ExecutorContext::default(), request)
1947            .await
1948            .context(error::ExecuteDdlSnafu)
1949    }
1950
1951    #[tracing::instrument(skip_all)]
1952    pub async fn create_database(
1953        &self,
1954        database: &str,
1955        create_if_not_exists: bool,
1956        options: HashMap<String, String>,
1957        query_context: QueryContextRef,
1958    ) -> Result<Output> {
1959        let catalog = query_context.current_catalog();
1960        ensure!(
1961            NAME_PATTERN_REG.is_match(catalog),
1962            error::UnexpectedSnafu {
1963                violated: format!("Invalid catalog name: {}", catalog)
1964            }
1965        );
1966
1967        ensure!(
1968            NAME_PATTERN_REG.is_match(database),
1969            error::UnexpectedSnafu {
1970                violated: format!("Invalid database name: {}", database)
1971            }
1972        );
1973
1974        if !self
1975            .catalog_manager
1976            .schema_exists(catalog, database, None)
1977            .await
1978            .context(CatalogSnafu)?
1979            && !self.catalog_manager.is_reserved_schema_name(database)
1980        {
1981            self.create_database_procedure(
1982                catalog.to_string(),
1983                database.to_string(),
1984                create_if_not_exists,
1985                options,
1986                query_context,
1987            )
1988            .await?;
1989
1990            Ok(Output::new_with_affected_rows(1))
1991        } else if create_if_not_exists {
1992            Ok(Output::new_with_affected_rows(1))
1993        } else {
1994            error::SchemaExistsSnafu { name: database }.fail()
1995        }
1996    }
1997
1998    async fn create_database_procedure(
1999        &self,
2000        catalog: String,
2001        database: String,
2002        create_if_not_exists: bool,
2003        options: HashMap<String, String>,
2004        query_context: QueryContextRef,
2005    ) -> Result<SubmitDdlTaskResponse> {
2006        let request = SubmitDdlTaskRequest::new(
2007            to_meta_query_context(query_context),
2008            DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
2009        );
2010
2011        self.procedure_executor
2012            .submit_ddl_task(&ExecutorContext::default(), request)
2013            .await
2014            .context(error::ExecuteDdlSnafu)
2015    }
2016}
2017
2018/// Parse partition statement [Partitions] into [PartitionExpr] and partition columns.
2019pub fn parse_partitions(
2020    create_table: &CreateTableExpr,
2021    partitions: Option<Partitions>,
2022    query_ctx: &QueryContextRef,
2023) -> Result<(Vec<PartitionExpr>, Vec<String>)> {
2024    // If partitions are not defined by user, use the timestamp column (which has to be existed) as
2025    // the partition column, and create only one partition.
2026    let partition_columns = find_partition_columns(&partitions)?;
2027    let partition_exprs =
2028        find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
2029
2030    // Validates partition
2031    let exprs = partition_exprs.clone();
2032    MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs, true)
2033        .context(InvalidPartitionSnafu)?;
2034
2035    Ok((partition_exprs, partition_columns))
2036}
2037
2038fn parse_partitions_for_logical_validation(
2039    create_table: &CreateTableExpr,
2040    partitions: &Partitions,
2041    query_ctx: &QueryContextRef,
2042) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
2043    let partition_columns = partitions
2044        .column_list
2045        .iter()
2046        .map(|ident| ident.value.clone())
2047        .collect::<Vec<_>>();
2048
2049    let column_name_and_type = partition_columns
2050        .iter()
2051        .map(|pc| {
2052            let column = create_table
2053                .column_defs
2054                .iter()
2055                .find(|c| &c.name == pc)
2056                .context(ColumnNotFoundSnafu { msg: pc.clone() })?;
2057            let column_name = &column.name;
2058            let data_type = ConcreteDataType::from(
2059                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2060                    .context(ColumnDataTypeSnafu)?,
2061            );
2062            Ok((column_name, data_type))
2063        })
2064        .collect::<Result<HashMap<_, _>>>()?;
2065
2066    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2067    for expr in &partitions.exprs {
2068        let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
2069        partition_exprs.push(partition_expr);
2070    }
2071
2072    MultiDimPartitionRule::try_new(
2073        partition_columns.clone(),
2074        vec![],
2075        partition_exprs.clone(),
2076        true,
2077    )
2078    .context(InvalidPartitionSnafu)?;
2079
2080    Ok((partition_columns, partition_exprs))
2081}
2082
2083/// Verifies an alter and returns whether it is necessary to perform the alter.
2084///
2085/// # Returns
2086///
2087/// Returns true if the alter need to be porformed; otherwise, it returns false.
2088pub fn verify_alter(
2089    table_id: TableId,
2090    table_info: Arc<TableInfo>,
2091    expr: AlterTableExpr,
2092) -> Result<bool> {
2093    let request: AlterTableRequest =
2094        common_grpc_expr::alter_expr_to_request(table_id, expr, Some(&table_info.meta))
2095            .context(AlterExprToRequestSnafu)?;
2096
2097    let AlterTableRequest {
2098        table_name,
2099        alter_kind,
2100        ..
2101    } = &request;
2102
2103    if let AlterKind::RenameTable { new_table_name } = alter_kind {
2104        ensure!(
2105            NAME_PATTERN_REG.is_match(new_table_name),
2106            error::UnexpectedSnafu {
2107                violated: format!("Invalid table name: {}", new_table_name)
2108            }
2109        );
2110    } else if let AlterKind::AddColumns { columns } = alter_kind {
2111        // If all the columns are marked as add_if_not_exists and they already exist in the table,
2112        // there is no need to perform the alter.
2113        let column_names: HashSet<_> = table_info
2114            .meta
2115            .schema
2116            .column_schemas()
2117            .iter()
2118            .map(|schema| &schema.name)
2119            .collect();
2120        if columns.iter().all(|column| {
2121            column_names.contains(&column.column_schema.name) && column.add_if_not_exists
2122        }) {
2123            return Ok(false);
2124        }
2125    }
2126
2127    let _ = table_info
2128        .meta
2129        .builder_with_alter_kind(table_name, &request.alter_kind)
2130        .context(error::TableSnafu)?
2131        .build()
2132        .context(error::BuildTableMetaSnafu { table_name })?;
2133
2134    Ok(true)
2135}
2136
2137pub fn create_table_info(
2138    create_table: &CreateTableExpr,
2139    partition_columns: Vec<String>,
2140) -> Result<TableInfo> {
2141    let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
2142    let mut column_name_to_index_map = HashMap::new();
2143
2144    for (idx, column) in create_table.column_defs.iter().enumerate() {
2145        let schema =
2146            column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
2147                column: &column.name,
2148            })?;
2149        let schema = schema.with_time_index(column.name == create_table.time_index);
2150
2151        column_schemas.push(schema);
2152        let _ = column_name_to_index_map.insert(column.name.clone(), idx);
2153    }
2154
2155    let next_column_id = column_schemas.len() as u32;
2156    let schema = Arc::new(Schema::new(column_schemas));
2157
2158    let primary_key_indices = create_table
2159        .primary_keys
2160        .iter()
2161        .map(|name| {
2162            column_name_to_index_map
2163                .get(name)
2164                .cloned()
2165                .context(ColumnNotFoundSnafu { msg: name })
2166        })
2167        .collect::<Result<Vec<_>>>()?;
2168
2169    let partition_key_indices = partition_columns
2170        .into_iter()
2171        .map(|col_name| {
2172            column_name_to_index_map
2173                .get(&col_name)
2174                .cloned()
2175                .context(ColumnNotFoundSnafu { msg: col_name })
2176        })
2177        .collect::<Result<Vec<_>>>()?;
2178
2179    let table_options = TableOptions::try_from_iter(&create_table.table_options)
2180        .context(UnrecognizedTableOptionSnafu)?;
2181
2182    let meta = TableMeta {
2183        schema,
2184        primary_key_indices,
2185        value_indices: vec![],
2186        engine: create_table.engine.clone(),
2187        next_column_id,
2188        options: table_options,
2189        created_on: Utc::now(),
2190        updated_on: Utc::now(),
2191        partition_key_indices,
2192        column_ids: vec![],
2193    };
2194
2195    let desc = if create_table.desc.is_empty() {
2196        create_table.table_options.get(COMMENT_KEY).cloned()
2197    } else {
2198        Some(create_table.desc.clone())
2199    };
2200
2201    let table_info = TableInfo {
2202        ident: metadata::TableIdent {
2203            // The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
2204            table_id: 0,
2205            version: 0,
2206        },
2207        name: create_table.table_name.clone(),
2208        desc,
2209        catalog_name: create_table.catalog_name.clone(),
2210        schema_name: create_table.schema_name.clone(),
2211        meta,
2212        table_type: TableType::Base,
2213    };
2214    Ok(table_info)
2215}
2216
2217fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
2218    let columns = if let Some(partitions) = partitions {
2219        partitions
2220            .column_list
2221            .iter()
2222            .map(|x| x.value.clone())
2223            .collect::<Vec<_>>()
2224    } else {
2225        vec![]
2226    };
2227    Ok(columns)
2228}
2229
2230/// Parse [Partitions] into a group of partition entries.
2231///
2232/// Returns a list of [PartitionExpr], each of which defines a partition.
2233fn find_partition_entries(
2234    create_table: &CreateTableExpr,
2235    partitions: &Option<Partitions>,
2236    partition_columns: &[String],
2237    query_ctx: &QueryContextRef,
2238) -> Result<Vec<PartitionExpr>> {
2239    let Some(partitions) = partitions else {
2240        return Ok(vec![]);
2241    };
2242
2243    // extract concrete data type of partition columns
2244    let column_name_and_type = partition_columns
2245        .iter()
2246        .map(|pc| {
2247            let column = create_table
2248                .column_defs
2249                .iter()
2250                .find(|c| &c.name == pc)
2251                // unwrap is safe here because we have checked that partition columns are defined
2252                .unwrap();
2253            let column_name = &column.name;
2254            let data_type = ConcreteDataType::from(
2255                ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
2256                    .context(ColumnDataTypeSnafu)?,
2257            );
2258            Ok((column_name, data_type))
2259        })
2260        .collect::<Result<HashMap<_, _>>>()?;
2261
2262    // Transform parser expr to partition expr
2263    let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
2264    for partition in &partitions.exprs {
2265        let partition_expr =
2266            convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
2267        partition_exprs.push(partition_expr);
2268    }
2269
2270    Ok(partition_exprs)
2271}
2272
2273fn convert_one_expr(
2274    expr: &Expr,
2275    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2276    timezone: &Timezone,
2277) -> Result<PartitionExpr> {
2278    let Expr::BinaryOp { left, op, right } = expr else {
2279        return InvalidPartitionRuleSnafu {
2280            reason: "partition rule must be a binary expression",
2281        }
2282        .fail();
2283    };
2284
2285    let op =
2286        RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
2287            reason: format!("unsupported operator in partition expr {op}"),
2288        })?;
2289
2290    // convert leaf node.
2291    let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
2292        // col, val
2293        (Expr::Identifier(ident), Expr::Value(value)) => {
2294            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2295            let value = convert_value(&value.value, data_type, timezone, None)?;
2296            (Operand::Column(column_name), op, Operand::Value(value))
2297        }
2298        (Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
2299            if let Expr::Value(v) = &**expr =>
2300        {
2301            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2302            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2303            (Operand::Column(column_name), op, Operand::Value(value))
2304        }
2305        // val, col
2306        (Expr::Value(value), Expr::Identifier(ident)) => {
2307            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2308            let value = convert_value(&value.value, data_type, timezone, None)?;
2309            (Operand::Value(value), op, Operand::Column(column_name))
2310        }
2311        (Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
2312            if let Expr::Value(v) = &**expr =>
2313        {
2314            let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
2315            let value = convert_value(&v.value, data_type, timezone, Some(*unary_op))?;
2316            (Operand::Value(value), op, Operand::Column(column_name))
2317        }
2318        (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
2319            // sub-expr must against another sub-expr
2320            let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
2321            let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
2322            (Operand::Expr(lhs), op, Operand::Expr(rhs))
2323        }
2324        _ => {
2325            return InvalidPartitionRuleSnafu {
2326                reason: format!("invalid partition expr {expr}"),
2327            }
2328            .fail();
2329        }
2330    };
2331
2332    Ok(PartitionExpr::new(lhs, op, rhs))
2333}
2334
2335fn convert_identifier(
2336    ident: &Ident,
2337    column_name_and_type: &HashMap<&String, ConcreteDataType>,
2338) -> Result<(String, ConcreteDataType)> {
2339    let column_name = ident.value.clone();
2340    let data_type = column_name_and_type
2341        .get(&column_name)
2342        .cloned()
2343        .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
2344    Ok((column_name, data_type))
2345}
2346
2347fn convert_value(
2348    value: &ParserValue,
2349    data_type: ConcreteDataType,
2350    timezone: &Timezone,
2351    unary_op: Option<UnaryOperator>,
2352) -> Result<Value> {
2353    sql_value_to_value(
2354        &ColumnSchema::new("<NONAME>", data_type, true),
2355        value,
2356        Some(timezone),
2357        unary_op,
2358        false,
2359    )
2360    .context(error::SqlCommonSnafu)
2361}
2362
2363#[cfg(test)]
2364mod test {
2365    use std::time::Duration;
2366
2367    use session::context::{QueryContext, QueryContextBuilder};
2368    use sql::dialect::GreptimeDbDialect;
2369    use sql::parser::{ParseOptions, ParserContext};
2370    use sql::statements::statement::Statement;
2371    use sqlparser::parser::Parser;
2372
2373    use super::*;
2374    use crate::expr_helper;
2375
2376    #[test]
2377    fn test_parse_ddl_options() {
2378        let options = OptionMap::from([
2379            ("timeout".to_string(), "5m".to_string()),
2380            ("wait".to_string(), "false".to_string()),
2381        ]);
2382        let ddl_options = parse_ddl_options(&options).unwrap();
2383        assert!(!ddl_options.wait);
2384        assert_eq!(Duration::from_secs(300), ddl_options.timeout);
2385    }
2386
2387    #[test]
2388    fn test_validate_and_normalize_flow_options_empty() {
2389        assert!(
2390            validate_and_normalize_flow_options(HashMap::new())
2391                .unwrap()
2392                .is_empty()
2393        );
2394    }
2395
2396    #[test]
2397    fn test_validate_and_normalize_flow_options_valid() {
2398        let options =
2399            HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]);
2400
2401        assert_eq!(
2402            validate_and_normalize_flow_options(options).unwrap(),
2403            HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)])
2404        );
2405    }
2406
2407    #[test]
2408    fn test_validate_and_normalize_flow_options_unknown_option() {
2409        let err = validate_and_normalize_flow_options(HashMap::from([(
2410            "foo".to_string(),
2411            "bar".to_string(),
2412        )]))
2413        .unwrap_err();
2414
2415        assert!(
2416            err.to_string()
2417                .contains("unknown flow option 'foo', supported options: defer_on_missing_source")
2418        );
2419    }
2420
2421    #[test]
2422    fn test_validate_and_normalize_flow_options_reserved_option() {
2423        let err = validate_and_normalize_flow_options(HashMap::from([(
2424            FlowType::FLOW_TYPE_KEY.to_string(),
2425            FlowType::BATCHING.to_string(),
2426        )]))
2427        .unwrap_err();
2428
2429        assert!(
2430            err.to_string()
2431                .contains("flow option 'flow_type' is reserved for internal use")
2432        );
2433    }
2434
2435    #[test]
2436    fn test_validate_and_normalize_flow_options_invalid_bool() {
2437        let err = validate_and_normalize_flow_options(HashMap::from([(
2438            DEFER_ON_MISSING_SOURCE_KEY.to_string(),
2439            "not-a-bool".to_string(),
2440        )]))
2441        .unwrap_err();
2442
2443        assert!(
2444            err.to_string()
2445                .contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'")
2446        );
2447    }
2448
2449    #[test]
2450    fn test_validate_and_normalize_flow_options_rejects_redacted_invalid_input() {
2451        let sql = r"
2452CREATE FLOW task_6
2453SINK TO schema_1.table_1
2454WITH (access_key_id = [true])
2455AS
2456SELECT max(c1), min(c2) FROM schema_2.table_2;";
2457        let stmt =
2458            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
2459                .unwrap()
2460                .pop()
2461                .unwrap();
2462
2463        let Statement::CreateFlow(create_flow) = stmt else {
2464            unreachable!()
2465        };
2466        let expr =
2467            expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
2468        let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err();
2469
2470        assert!(err.to_string().contains(
2471            "unknown flow option 'access_key_id', supported options: defer_on_missing_source"
2472        ));
2473    }
2474
2475    #[test]
2476    fn test_name_is_match() {
2477        assert!(!NAME_PATTERN_REG.is_match("/adaf"));
2478        assert!(!NAME_PATTERN_REG.is_match("🈲"));
2479        assert!(NAME_PATTERN_REG.is_match("hello"));
2480        assert!(NAME_PATTERN_REG.is_match("test@"));
2481        assert!(!NAME_PATTERN_REG.is_match("@test"));
2482        assert!(NAME_PATTERN_REG.is_match("test#"));
2483        assert!(!NAME_PATTERN_REG.is_match("#test"));
2484        assert!(!NAME_PATTERN_REG.is_match("@"));
2485        assert!(!NAME_PATTERN_REG.is_match("#"));
2486    }
2487
2488    #[test]
2489    fn test_partition_expr_equivalence_with_swapped_operands() {
2490        let column_name = "device_id".to_string();
2491        let column_name_and_type =
2492            HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
2493        let timezone = Timezone::from_tz_string("UTC").unwrap();
2494        let dialect = GreptimeDbDialect {};
2495
2496        let mut parser = Parser::new(&dialect)
2497            .try_with_sql("device_id < 100")
2498            .unwrap();
2499        let expr_left = parser.parse_expr().unwrap();
2500
2501        let mut parser = Parser::new(&dialect)
2502            .try_with_sql("100 > device_id")
2503            .unwrap();
2504        let expr_right = parser.parse_expr().unwrap();
2505
2506        let partition_left =
2507            convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
2508        let partition_right =
2509            convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
2510
2511        assert_eq!(partition_left, partition_right);
2512        assert!([partition_left.clone()].contains(&partition_right));
2513
2514        let mut physical_partition_exprs = vec![partition_left];
2515        let mut logical_partition_exprs = vec![partition_right];
2516        physical_partition_exprs.sort_unstable();
2517        logical_partition_exprs.sort_unstable();
2518        assert_eq!(physical_partition_exprs, logical_partition_exprs);
2519    }
2520
2521    #[tokio::test]
2522    #[ignore = "TODO(ruihang): WIP new partition rule"]
2523    async fn test_parse_partitions() {
2524        common_telemetry::init_default_ut_logging();
2525        let cases = [
2526            (
2527                r"
2528CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2529PARTITION ON COLUMNS (b) (
2530  b < 'hz',
2531  b >= 'hz' AND b < 'sh',
2532  b >= 'sh'
2533)
2534ENGINE=mito",
2535                r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
2536            ),
2537            (
2538                r"
2539CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
2540PARTITION BY RANGE COLUMNS (b, a) (
2541  PARTITION r0 VALUES LESS THAN ('hz', 10),
2542  b < 'hz' AND a < 10,
2543  b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
2544  b >= 'sh' AND a >= 20
2545)
2546ENGINE=mito",
2547                r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
2548            ),
2549        ];
2550        let ctx = QueryContextBuilder::default().build().into();
2551        for (sql, expected) in cases {
2552            let result = ParserContext::create_with_dialect(
2553                sql,
2554                &GreptimeDbDialect {},
2555                ParseOptions::default(),
2556            )
2557            .unwrap();
2558            match &result[0] {
2559                Statement::CreateTable(c) => {
2560                    let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
2561                    let (partitions, _) =
2562                        parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
2563                    let json = serde_json::to_string(&partitions).unwrap();
2564                    assert_eq!(json, expected);
2565                }
2566                _ => unreachable!(),
2567            }
2568        }
2569    }
2570}