Skip to main content

operator/
expr_helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19
20use api::helper::ColumnDataTypeWrapper;
21use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
22use api::v1::alter_table_expr::Kind as AlterTableKind;
23use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
24use api::v1::{
25    AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
26    ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
27    DropColumns, DropDefaults, ExpireAfter, FulltextBackend as PbFulltextBackend, ModifyColumnType,
28    ModifyColumnTypes, RenameTable, SemanticType, SetDatabaseOptions, SetDefaults, SetFulltext,
29    SetIndex, SetIndexes, SetInverted, SetSkipping, SetTableOptions,
30    SkippingIndexType as PbSkippingIndexType, TableName, UnsetDatabaseOptions, UnsetFulltext,
31    UnsetIndex, UnsetIndexes, UnsetInverted, UnsetSkipping, UnsetTableOptions, set_index,
32    unset_index,
33};
34use common_error::ext::BoxedError;
35use common_grpc_expr::util::ColumnExpr;
36use common_time::Timezone;
37use datafusion::sql::planner::object_name_to_table_reference;
38use datatypes::schema::{
39    COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
40    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
41    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
42    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
43    ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
44    SkippingIndexType,
45};
46use file_engine::FileOptions;
47use query::sql::{
48    check_file_to_table_schema_compatibility, file_column_schemas_to_table,
49    infer_file_table_schema, prepare_file_table_files,
50};
51use session::context::QueryContextRef;
52use session::table_name::table_idents_to_full_name;
53use snafu::{OptionExt, ResultExt, ensure};
54use sql::ast::{
55    ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
56};
57use sql::dialect::GreptimeDbDialect;
58use sql::parser::ParserContext;
59use sql::statements::alter::{
60    AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
61};
62use sql::statements::create::{
63    Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
64    CreateView, TableConstraint,
65};
66use sql::statements::{
67    OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
68    sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
69};
70use sql::util::extract_tables_from_query;
71use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
72use table::requests::{FILE_TABLE_META_KEY, TableOptions};
73use table::table_reference::TableReference;
74#[cfg(feature = "enterprise")]
75pub use trigger::to_create_trigger_task_expr;
76
77use crate::error::{
78    BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
79    ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
80    IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
81    InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
82    PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
83};
84
85pub fn create_table_expr_by_column_schemas(
86    table_name: &TableReference<'_>,
87    column_schemas: &[api::v1::ColumnSchema],
88    engine: &str,
89    desc: Option<&str>,
90) -> Result<CreateTableExpr> {
91    let column_exprs = ColumnExpr::from_column_schemas(column_schemas);
92    let expr = common_grpc_expr::util::build_create_table_expr(
93        None,
94        table_name,
95        column_exprs,
96        engine,
97        desc.unwrap_or("Created on insertion"),
98    )
99    .context(BuildCreateExprOnInsertionSnafu)?;
100
101    validate_create_expr(&expr)?;
102    Ok(expr)
103}
104
105pub fn extract_add_columns_expr(
106    schema: &Schema,
107    column_exprs: Vec<ColumnExpr>,
108) -> Result<Option<AddColumns>> {
109    let add_columns = common_grpc_expr::util::extract_new_columns(schema, column_exprs)
110        .context(FindNewColumnsOnInsertionSnafu)?;
111    if let Some(add_columns) = &add_columns {
112        validate_add_columns_expr(add_columns)?;
113    }
114    Ok(add_columns)
115}
116
117//   cpu float64,
118//   memory float64,
119//   TIME INDEX (ts),
120//   PRIMARY KEY(host)
121// ) WITH (location='/var/data/city.csv', format='csv');
122// ```
123// The user needs to specify the TIME INDEX column. If there is no suitable
124// column in the file to use as TIME INDEX, an additional placeholder column
125// needs to be created as the TIME INDEX, and a `DEFAULT <value>` constraint
126// should be added.
127//
128//
129// When the `CREATE EXTERNAL TABLE` statement is in inferred form, like
130// ```sql
131// CREATE EXTERNAL TABLE IF NOT EXISTS city WITH (location='/var/data/city.csv',format='csv');
132// ```
133// 1. If the TIME INDEX column can be inferred from metadata, use that column
134//    as the TIME INDEX. Otherwise,
135// 2. If a column named `greptime_timestamp` exists (with the requirement that
136//    the column is with type TIMESTAMP, otherwise an error is thrown), use
137//    that column as the TIME INDEX. Otherwise,
138// 3. Automatically create the `greptime_timestamp` column and add a `DEFAULT 0`
139//    constraint.
140pub(crate) async fn create_external_expr(
141    create: CreateExternalTable,
142    query_ctx: &QueryContextRef,
143) -> Result<CreateTableExpr> {
144    let (catalog_name, schema_name, table_name) =
145        table_idents_to_full_name(&create.name, query_ctx)
146            .map_err(BoxedError::new)
147            .context(ExternalSnafu)?;
148
149    let mut table_options = create.options.into_map();
150
151    let (object_store, files) = prepare_file_table_files(&table_options)
152        .await
153        .context(PrepareFileTableSnafu)?;
154
155    let file_column_schemas = infer_file_table_schema(&object_store, &files, &table_options)
156        .await
157        .context(InferFileTableSchemaSnafu)?
158        .column_schemas()
159        .to_vec();
160
161    let (time_index, primary_keys, table_column_schemas) = if !create.columns.is_empty() {
162        // expanded form
163        let time_index = find_time_index(&create.constraints)?;
164        let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
165        let column_schemas =
166            columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
167        (time_index, primary_keys, column_schemas)
168    } else {
169        // inferred form
170        let (column_schemas, time_index) = file_column_schemas_to_table(&file_column_schemas);
171        let primary_keys = vec![];
172        (time_index, primary_keys, column_schemas)
173    };
174
175    check_file_to_table_schema_compatibility(&file_column_schemas, &table_column_schemas)
176        .context(SchemaIncompatibleSnafu)?;
177
178    let meta = FileOptions {
179        files,
180        file_column_schemas,
181    };
182    table_options.insert(
183        FILE_TABLE_META_KEY.to_string(),
184        serde_json::to_string(&meta).context(EncodeJsonSnafu)?,
185    );
186
187    let column_defs = column_schemas_to_defs(table_column_schemas, &primary_keys)?;
188    let expr = CreateTableExpr {
189        catalog_name,
190        schema_name,
191        table_name,
192        desc: String::default(),
193        column_defs,
194        time_index,
195        primary_keys,
196        create_if_not_exists: create.if_not_exists,
197        table_options,
198        table_id: None,
199        engine: create.engine.clone(),
200    };
201
202    Ok(expr)
203}
204
205/// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request.
206pub fn create_to_expr(
207    create: &CreateTable,
208    query_ctx: &QueryContextRef,
209) -> Result<CreateTableExpr> {
210    let (catalog_name, schema_name, table_name) =
211        table_idents_to_full_name(&create.name, query_ctx)
212            .map_err(BoxedError::new)
213            .context(ExternalSnafu)?;
214
215    let time_index = find_time_index(&create.constraints)?;
216    let table_options = HashMap::from(
217        &TableOptions::try_from_iter(create.options.to_str_map())
218            .context(UnrecognizedTableOptionSnafu)?,
219    );
220
221    let mut table_options = table_options;
222    if table_options.contains_key(COMPACTION_TYPE) {
223        table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
224    }
225
226    let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
227
228    let expr = CreateTableExpr {
229        catalog_name,
230        schema_name,
231        table_name,
232        desc: String::default(),
233        column_defs: columns_to_expr(
234            &create.columns,
235            &time_index,
236            &primary_keys,
237            Some(&query_ctx.timezone()),
238        )?,
239        time_index,
240        primary_keys,
241        create_if_not_exists: create.if_not_exists,
242        table_options,
243        table_id: None,
244        engine: create.engine.clone(),
245    };
246
247    validate_create_expr(&expr)?;
248    Ok(expr)
249}
250
251/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
252/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
253///
254/// # Parameters
255///
256/// * `expr` - The `CreateTableExpr` to convert
257/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
258pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
259    let quote_style = quote_style.unwrap_or('`');
260
261    // Convert table name
262    let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
263        sql::ast::Ident::with_quote(quote_style, &expr.table_name),
264    )]);
265
266    // Convert columns
267    let mut columns = Vec::with_capacity(expr.column_defs.len());
268    for column_def in &expr.column_defs {
269        let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
270            column: &column_def.name,
271        })?;
272
273        let mut options = Vec::new();
274
275        // Add NULL/NOT NULL constraint
276        if column_def.is_nullable {
277            options.push(ColumnOptionDef {
278                name: None,
279                option: ColumnOption::Null,
280            });
281        } else {
282            options.push(ColumnOptionDef {
283                name: None,
284                option: ColumnOption::NotNull,
285            });
286        }
287
288        // Add DEFAULT constraint if present
289        if let Some(default_constraint) = column_schema.default_constraint() {
290            let expr = match default_constraint {
291                ColumnDefaultConstraint::Value(v) => {
292                    Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
293                }
294                ColumnDefaultConstraint::Function(func_expr) => {
295                    ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
296                        .context(ParseSqlSnafu)?
297                }
298            };
299            options.push(ColumnOptionDef {
300                name: None,
301                option: ColumnOption::Default(expr),
302            });
303        }
304
305        // Add COMMENT if present
306        if !column_def.comment.is_empty() {
307            options.push(ColumnOptionDef {
308                name: None,
309                option: ColumnOption::Comment(column_def.comment.clone()),
310            });
311        }
312
313        // Note: We don't add inline PRIMARY KEY options here,
314        // we'll handle all primary keys as constraints instead for consistency
315
316        // Handle column extensions (fulltext, inverted index, skipping index)
317        let mut extensions = ColumnExtensions::default();
318
319        // Add fulltext index options if present
320        if let Ok(Some(opt)) = column_schema.fulltext_options()
321            && opt.enable
322        {
323            let mut map = HashMap::from([
324                (
325                    COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
326                    opt.analyzer.to_string(),
327                ),
328                (
329                    COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
330                    opt.case_sensitive.to_string(),
331                ),
332                (
333                    COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
334                    opt.backend.to_string(),
335                ),
336            ]);
337            if opt.backend == FulltextBackend::Bloom {
338                map.insert(
339                    COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
340                    opt.granularity.to_string(),
341                );
342                map.insert(
343                    COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
344                    opt.false_positive_rate().to_string(),
345                );
346            }
347            extensions.fulltext_index_options = Some(map.into());
348        }
349
350        // Add skipping index options if present
351        if let Ok(Some(opt)) = column_schema.skipping_index_options() {
352            let map = HashMap::from([
353                (
354                    COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
355                    opt.granularity.to_string(),
356                ),
357                (
358                    COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
359                    opt.false_positive_rate().to_string(),
360                ),
361                (
362                    COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
363                    opt.index_type.to_string(),
364                ),
365            ]);
366            extensions.skipping_index_options = Some(map.into());
367        }
368
369        // Add inverted index options if present
370        if column_schema.is_inverted_indexed() {
371            extensions.inverted_index_options = Some(HashMap::new().into());
372        }
373
374        let sql_column = SqlColumn {
375            column_def: ColumnDef {
376                name: Ident::with_quote(quote_style, &column_def.name),
377                data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
378                    .context(ParseSqlSnafu)?,
379                options,
380            },
381            extensions,
382        };
383
384        columns.push(sql_column);
385    }
386
387    // Convert constraints
388    let mut constraints = Vec::new();
389
390    // Add TIME INDEX constraint
391    constraints.push(TableConstraint::TimeIndex {
392        column: Ident::with_quote(quote_style, &expr.time_index),
393    });
394
395    // Add PRIMARY KEY constraint (always add as constraint for consistency)
396    if !expr.primary_keys.is_empty() {
397        let primary_key_columns: Vec<Ident> = expr
398            .primary_keys
399            .iter()
400            .map(|pk| Ident::with_quote(quote_style, pk))
401            .collect();
402
403        constraints.push(TableConstraint::PrimaryKey {
404            columns: primary_key_columns,
405        });
406    }
407
408    // Convert table options
409    let mut options = OptionMap::default();
410    for (key, value) in &expr.table_options {
411        options.insert(key.clone(), value.clone());
412    }
413
414    Ok(CreateTable {
415        if_not_exists: expr.create_if_not_exists,
416        table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
417        name: table_name,
418        columns,
419        engine: expr.engine.clone(),
420        constraints,
421        options,
422        partitions: None,
423    })
424}
425
426/// Validate the [`CreateTableExpr`] request.
427pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
428    // construct column list
429    let mut column_to_indices = HashMap::with_capacity(create.column_defs.len());
430    for (idx, column) in create.column_defs.iter().enumerate() {
431        if let Some(indices) = column_to_indices.get(&column.name) {
432            return InvalidSqlSnafu {
433                err_msg: format!(
434                    "column name `{}` is duplicated at index {} and {}",
435                    column.name, indices, idx
436                ),
437            }
438            .fail();
439        }
440        column_to_indices.insert(&column.name, idx);
441    }
442
443    // verify time_index exists
444    let _ = column_to_indices
445        .get(&create.time_index)
446        .with_context(|| InvalidSqlSnafu {
447            err_msg: format!(
448                "column name `{}` is not found in column list",
449                create.time_index
450            ),
451        })?;
452
453    // verify primary_key exists
454    for pk in &create.primary_keys {
455        let _ = column_to_indices
456            .get(&pk)
457            .with_context(|| InvalidSqlSnafu {
458                err_msg: format!("column name `{}` is not found in column list", pk),
459            })?;
460    }
461
462    // construct primary_key set
463    let mut pk_set = HashSet::new();
464    for pk in &create.primary_keys {
465        if !pk_set.insert(pk) {
466            return InvalidSqlSnafu {
467                err_msg: format!("column name `{}` is duplicated in primary keys", pk),
468            }
469            .fail();
470        }
471    }
472
473    // verify time index is not primary key
474    if pk_set.contains(&create.time_index) {
475        return InvalidSqlSnafu {
476            err_msg: format!(
477                "column name `{}` is both primary key and time index",
478                create.time_index
479            ),
480        }
481        .fail();
482    }
483
484    for column in &create.column_defs {
485        // verify do not contain interval type column issue #3235
486        if is_interval_type(&column.data_type()) {
487            return InvalidSqlSnafu {
488                err_msg: format!(
489                    "column name `{}` is interval type, which is not supported",
490                    column.name
491                ),
492            }
493            .fail();
494        }
495        // verify do not contain datetime type column issue #5489
496        if is_date_time_type(&column.data_type()) {
497            return InvalidSqlSnafu {
498                err_msg: format!(
499                    "column name `{}` is datetime type, which is not supported, please use `timestamp` type instead",
500                    column.name
501                ),
502            }
503            .fail();
504        }
505    }
506    Ok(())
507}
508
509fn validate_add_columns_expr(add_columns: &AddColumns) -> Result<()> {
510    for add_column in &add_columns.add_columns {
511        let Some(column_def) = &add_column.column_def else {
512            continue;
513        };
514        if is_date_time_type(&column_def.data_type()) {
515            return InvalidSqlSnafu {
516                    err_msg: format!("column name `{}` is datetime type, which is not supported, please use `timestamp` type instead", column_def.name),
517                }
518                .fail();
519        }
520        if is_interval_type(&column_def.data_type()) {
521            return InvalidSqlSnafu {
522                err_msg: format!(
523                    "column name `{}` is interval type, which is not supported",
524                    column_def.name
525                ),
526            }
527            .fail();
528        }
529    }
530    Ok(())
531}
532
533fn is_date_time_type(data_type: &ColumnDataType) -> bool {
534    matches!(data_type, ColumnDataType::Datetime)
535}
536
537fn is_interval_type(data_type: &ColumnDataType) -> bool {
538    matches!(
539        data_type,
540        ColumnDataType::IntervalYearMonth
541            | ColumnDataType::IntervalDayTime
542            | ColumnDataType::IntervalMonthDayNano
543    )
544}
545
546fn find_primary_keys(
547    columns: &[SqlColumn],
548    constraints: &[TableConstraint],
549) -> Result<Vec<String>> {
550    let columns_pk = columns
551        .iter()
552        .filter_map(|x| {
553            if x.options()
554                .iter()
555                .any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
556            {
557                Some(x.name().value.clone())
558            } else {
559                None
560            }
561        })
562        .collect::<Vec<String>>();
563
564    ensure!(
565        columns_pk.len() <= 1,
566        IllegalPrimaryKeysDefSnafu {
567            msg: "not allowed to inline multiple primary keys in columns options"
568        }
569    );
570
571    let constraints_pk = constraints
572        .iter()
573        .filter_map(|constraint| match constraint {
574            TableConstraint::PrimaryKey { columns, .. } => {
575                Some(columns.iter().map(|ident| ident.value.clone()))
576            }
577            _ => None,
578        })
579        .flatten()
580        .collect::<Vec<String>>();
581
582    ensure!(
583        columns_pk.is_empty() || constraints_pk.is_empty(),
584        IllegalPrimaryKeysDefSnafu {
585            msg: "found definitions of primary keys in multiple places"
586        }
587    );
588
589    let mut primary_keys = Vec::with_capacity(columns_pk.len() + constraints_pk.len());
590    primary_keys.extend(columns_pk);
591    primary_keys.extend(constraints_pk);
592    Ok(primary_keys)
593}
594
595pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
596    let time_index = constraints
597        .iter()
598        .filter_map(|constraint| match constraint {
599            TableConstraint::TimeIndex { column, .. } => Some(&column.value),
600            _ => None,
601        })
602        .collect::<Vec<&String>>();
603    ensure!(
604        time_index.len() == 1,
605        InvalidSqlSnafu {
606            err_msg: "must have one and only one TimeIndex columns",
607        }
608    );
609    Ok(time_index[0].clone())
610}
611
612fn columns_to_expr(
613    column_defs: &[SqlColumn],
614    time_index: &str,
615    primary_keys: &[String],
616    timezone: Option<&Timezone>,
617) -> Result<Vec<api::v1::ColumnDef>> {
618    let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
619    column_schemas_to_defs(column_schemas, primary_keys)
620}
621
622fn columns_to_column_schemas(
623    columns: &[SqlColumn],
624    time_index: &str,
625    timezone: Option<&Timezone>,
626) -> Result<Vec<ColumnSchema>> {
627    columns
628        .iter()
629        .map(|c| column_to_schema(c, time_index, timezone).context(ParseSqlSnafu))
630        .collect::<Result<Vec<ColumnSchema>>>()
631}
632
633// TODO(weny): refactor this function to use `try_as_column_def`
634pub fn column_schemas_to_defs(
635    column_schemas: Vec<ColumnSchema>,
636    primary_keys: &[String],
637) -> Result<Vec<api::v1::ColumnDef>> {
638    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
639        .iter()
640        .map(|c| {
641            ColumnDataTypeWrapper::try_from(c.data_type.clone())
642                .map(|w| w.to_parts())
643                .context(ColumnDataTypeSnafu)
644        })
645        .collect::<Result<Vec<_>>>()?;
646
647    column_schemas
648        .iter()
649        .zip(column_datatypes)
650        .map(|(schema, datatype)| {
651            let semantic_type = if schema.is_time_index() {
652                SemanticType::Timestamp
653            } else if primary_keys.contains(&schema.name) {
654                SemanticType::Tag
655            } else {
656                SemanticType::Field
657            } as i32;
658            let comment = schema
659                .metadata()
660                .get(COMMENT_KEY)
661                .cloned()
662                .unwrap_or_default();
663
664            Ok(api::v1::ColumnDef {
665                name: schema.name.clone(),
666                data_type: datatype.0 as i32,
667                is_nullable: schema.is_nullable(),
668                default_constraint: match schema.default_constraint() {
669                    None => vec![],
670                    Some(v) => {
671                        v.clone()
672                            .try_into()
673                            .context(ConvertColumnDefaultConstraintSnafu {
674                                column_name: &schema.name,
675                            })?
676                    }
677                },
678                semantic_type,
679                comment,
680                datatype_extension: datatype.1,
681                options: options_from_column_schema(schema),
682            })
683        })
684        .collect()
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
688pub struct RepartitionRequest {
689    pub catalog_name: String,
690    pub schema_name: String,
691    pub table_name: String,
692    pub source: RepartitionSource,
693    pub into_exprs: Vec<Expr>,
694    pub options: OptionMap,
695}
696
697#[derive(Debug, Clone, PartialEq, Eq)]
698pub enum RepartitionSource {
699    Partitions { from_exprs: Vec<Expr> },
700    Unpartitioned { partition_columns: Vec<String> },
701}
702
703pub(crate) fn to_repartition_request(
704    alter_table: AlterTable,
705    query_ctx: &QueryContextRef,
706) -> Result<RepartitionRequest> {
707    let AlterTable {
708        table_name,
709        alter_operation,
710        options,
711    } = alter_table;
712
713    let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
714        .map_err(BoxedError::new)
715        .context(ExternalSnafu)?;
716
717    let (source, into_exprs) = match alter_operation {
718        AlterTableOperation::Repartition { operation } => (
719            RepartitionSource::Partitions {
720                from_exprs: operation.from_exprs,
721            },
722            operation.into_exprs,
723        ),
724        AlterTableOperation::Partition { partitions } => (
725            RepartitionSource::Unpartitioned {
726                partition_columns: partitions
727                    .column_list
728                    .into_iter()
729                    .map(|ident| ident.value)
730                    .collect(),
731            },
732            partitions.exprs,
733        ),
734        _ => {
735            return InvalidSqlSnafu {
736                err_msg: "expected REPARTITION or PARTITION operation",
737            }
738            .fail();
739        }
740    };
741
742    Ok(RepartitionRequest {
743        catalog_name,
744        schema_name,
745        table_name,
746        source,
747        into_exprs,
748        options,
749    })
750}
751
752/// Converts a SQL alter table statement into a gRPC alter table expression.
753pub(crate) fn to_alter_table_expr(
754    alter_table: AlterTable,
755    query_ctx: &QueryContextRef,
756) -> Result<AlterTableExpr> {
757    let (catalog_name, schema_name, table_name) =
758        table_idents_to_full_name(alter_table.table_name(), query_ctx)
759            .map_err(BoxedError::new)
760            .context(ExternalSnafu)?;
761
762    let kind = match alter_table.alter_operation {
763        AlterTableOperation::AddConstraint(_) => {
764            return NotSupportedSnafu {
765                feat: "ADD CONSTRAINT",
766            }
767            .fail();
768        }
769        AlterTableOperation::AddColumns { add_columns } => AlterTableKind::AddColumns(AddColumns {
770            add_columns: add_columns
771                .into_iter()
772                .map(|add_column| {
773                    let column_def = sql_column_def_to_grpc_column_def(
774                        &add_column.column_def,
775                        Some(&query_ctx.timezone()),
776                    )
777                    .map_err(BoxedError::new)
778                    .context(ExternalSnafu)?;
779                    if is_interval_type(&column_def.data_type()) {
780                        return NotSupportedSnafu {
781                            feat: "Add column with interval type",
782                        }
783                        .fail();
784                    }
785                    Ok(AddColumn {
786                        column_def: Some(column_def),
787                        location: add_column.location.as_ref().map(From::from),
788                        add_if_not_exists: add_column.add_if_not_exists,
789                    })
790                })
791                .collect::<Result<Vec<AddColumn>>>()?,
792        }),
793        AlterTableOperation::ModifyColumnType {
794            column_name,
795            target_type,
796        } => {
797            let target_type =
798                sql_data_type_to_concrete_data_type(&target_type, &Default::default())
799                    .context(ParseSqlSnafu)?;
800            let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
801                .map(|w| w.to_parts())
802                .context(ColumnDataTypeSnafu)?;
803            if is_interval_type(&target_type) {
804                return NotSupportedSnafu {
805                    feat: "Modify column type to interval type",
806                }
807                .fail();
808            }
809            AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
810                modify_column_types: vec![ModifyColumnType {
811                    column_name: column_name.value,
812                    target_type: target_type as i32,
813                    target_type_extension,
814                }],
815            })
816        }
817        AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
818            drop_columns: vec![DropColumn {
819                name: name.value.clone(),
820            }],
821        }),
822        AlterTableOperation::RenameTable { new_table_name } => {
823            AlterTableKind::RenameTable(RenameTable {
824                new_table_name: new_table_name.clone(),
825            })
826        }
827        AlterTableOperation::SetTableOptions { options } => {
828            AlterTableKind::SetTableOptions(SetTableOptions {
829                table_options: options.into_iter().map(Into::into).collect(),
830            })
831        }
832        AlterTableOperation::UnsetTableOptions { keys } => {
833            AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
834        }
835        AlterTableOperation::Repartition { .. } => {
836            return NotSupportedSnafu {
837                feat: "ALTER TABLE ... REPARTITION",
838            }
839            .fail();
840        }
841        AlterTableOperation::Partition { .. } => {
842            return NotSupportedSnafu {
843                feat: "ALTER TABLE ... PARTITION ON COLUMNS",
844            }
845            .fail();
846        }
847        AlterTableOperation::SetIndex { options } => {
848            let option = match options {
849                sql::statements::alter::SetIndexOperation::Fulltext {
850                    column_name,
851                    options,
852                } => SetIndex {
853                    options: Some(set_index::Options::Fulltext(SetFulltext {
854                        column_name: column_name.value,
855                        enable: options.enable,
856                        analyzer: match options.analyzer {
857                            FulltextAnalyzer::English => Analyzer::English.into(),
858                            FulltextAnalyzer::Chinese => Analyzer::Chinese.into(),
859                        },
860                        case_sensitive: options.case_sensitive,
861                        backend: match options.backend {
862                            FulltextBackend::Bloom => PbFulltextBackend::Bloom.into(),
863                            FulltextBackend::Tantivy => PbFulltextBackend::Tantivy.into(),
864                        },
865                        granularity: options.granularity as u64,
866                        false_positive_rate: options.false_positive_rate(),
867                    })),
868                },
869                sql::statements::alter::SetIndexOperation::Inverted { column_name } => SetIndex {
870                    options: Some(set_index::Options::Inverted(SetInverted {
871                        column_name: column_name.value,
872                    })),
873                },
874                sql::statements::alter::SetIndexOperation::Skipping {
875                    column_name,
876                    options,
877                } => SetIndex {
878                    options: Some(set_index::Options::Skipping(SetSkipping {
879                        column_name: column_name.value,
880                        enable: true,
881                        granularity: options.granularity as u64,
882                        false_positive_rate: options.false_positive_rate(),
883                        skipping_index_type: match options.index_type {
884                            SkippingIndexType::BloomFilter => {
885                                PbSkippingIndexType::BloomFilter.into()
886                            }
887                        },
888                    })),
889                },
890            };
891            AlterTableKind::SetIndexes(SetIndexes {
892                set_indexes: vec![option],
893            })
894        }
895        AlterTableOperation::UnsetIndex { options } => {
896            let option = match options {
897                sql::statements::alter::UnsetIndexOperation::Fulltext { column_name } => {
898                    UnsetIndex {
899                        options: Some(unset_index::Options::Fulltext(UnsetFulltext {
900                            column_name: column_name.value,
901                        })),
902                    }
903                }
904                sql::statements::alter::UnsetIndexOperation::Inverted { column_name } => {
905                    UnsetIndex {
906                        options: Some(unset_index::Options::Inverted(UnsetInverted {
907                            column_name: column_name.value,
908                        })),
909                    }
910                }
911                sql::statements::alter::UnsetIndexOperation::Skipping { column_name } => {
912                    UnsetIndex {
913                        options: Some(unset_index::Options::Skipping(UnsetSkipping {
914                            column_name: column_name.value,
915                        })),
916                    }
917                }
918            };
919
920            AlterTableKind::UnsetIndexes(UnsetIndexes {
921                unset_indexes: vec![option],
922            })
923        }
924        AlterTableOperation::DropDefaults { columns } => {
925            AlterTableKind::DropDefaults(DropDefaults {
926                drop_defaults: columns
927                    .into_iter()
928                    .map(|col| {
929                        let column_name = col.0.to_string();
930                        Ok(api::v1::DropDefault { column_name })
931                    })
932                    .collect::<Result<Vec<_>>>()?,
933            })
934        }
935        AlterTableOperation::SetDefaults { defaults } => AlterTableKind::SetDefaults(SetDefaults {
936            set_defaults: defaults
937                .into_iter()
938                .map(|col| {
939                    let column_name = col.column_name.to_string();
940                    let default_constraint = serde_json::to_string(&col.default_constraint)
941                        .context(EncodeJsonSnafu)?
942                        .into_bytes();
943                    Ok(api::v1::SetDefault {
944                        column_name,
945                        default_constraint,
946                    })
947                })
948                .collect::<Result<Vec<_>>>()?,
949        }),
950    };
951
952    Ok(AlterTableExpr {
953        catalog_name,
954        schema_name,
955        table_name,
956        kind: Some(kind),
957    })
958}
959
960/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
961pub fn to_alter_database_expr(
962    alter_database: AlterDatabase,
963    query_ctx: &QueryContextRef,
964) -> Result<AlterDatabaseExpr> {
965    let catalog = query_ctx.current_catalog();
966    let schema = alter_database.database_name;
967
968    let kind = match alter_database.alter_operation {
969        AlterDatabaseOperation::SetDatabaseOption { options } => {
970            let options = options.into_iter().map(Into::into).collect();
971            AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
972                set_database_options: options,
973            })
974        }
975        AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
976            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
977        }
978    };
979
980    Ok(AlterDatabaseExpr {
981        catalog_name: catalog.to_string(),
982        schema_name: schema.to_string(),
983        kind: Some(kind),
984    })
985}
986
987/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
988pub fn to_create_view_expr(
989    stmt: CreateView,
990    logical_plan: Vec<u8>,
991    table_names: Vec<TableName>,
992    columns: Vec<String>,
993    plan_columns: Vec<String>,
994    definition: String,
995    query_ctx: QueryContextRef,
996) -> Result<CreateViewExpr> {
997    let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
998        .map_err(BoxedError::new)
999        .context(ExternalSnafu)?;
1000
1001    let expr = CreateViewExpr {
1002        catalog_name,
1003        schema_name,
1004        view_name,
1005        logical_plan,
1006        create_if_not_exists: stmt.if_not_exists,
1007        or_replace: stmt.or_replace,
1008        table_names,
1009        columns,
1010        plan_columns,
1011        definition,
1012    };
1013
1014    Ok(expr)
1015}
1016
1017pub fn to_create_flow_task_expr(
1018    create_flow: CreateFlow,
1019    query_ctx: &QueryContextRef,
1020) -> Result<CreateFlowExpr> {
1021    // retrieve sink table name
1022    let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
1023        .with_context(|_| ConvertIdentifierSnafu {
1024            ident: create_flow.sink_table_name.to_string(),
1025        })?;
1026    let catalog = sink_table_ref
1027        .catalog()
1028        .unwrap_or(query_ctx.current_catalog())
1029        .to_string();
1030    let schema = sink_table_ref
1031        .schema()
1032        .map(|s| s.to_owned())
1033        .unwrap_or(query_ctx.current_schema());
1034
1035    let sink_table_name = TableName {
1036        catalog_name: catalog,
1037        schema_name: schema,
1038        table_name: sink_table_ref.table().to_string(),
1039    };
1040
1041    let source_table_names = extract_tables_from_query(&create_flow.query)
1042        .map(|name| {
1043            let reference =
1044                object_name_to_table_reference(name.clone(), true).with_context(|_| {
1045                    ConvertIdentifierSnafu {
1046                        ident: name.to_string(),
1047                    }
1048                })?;
1049            let catalog = reference
1050                .catalog()
1051                .unwrap_or(query_ctx.current_catalog())
1052                .to_string();
1053            let schema = reference
1054                .schema()
1055                .map(|s| s.to_string())
1056                .unwrap_or(query_ctx.current_schema());
1057
1058            let table_name = TableName {
1059                catalog_name: catalog,
1060                schema_name: schema,
1061                table_name: reference.table().to_string(),
1062            };
1063            Ok(table_name)
1064        })
1065        .collect::<Result<Vec<_>>>()?;
1066
1067    let eval_interval = create_flow.eval_interval;
1068
1069    Ok(CreateFlowExpr {
1070        catalog_name: query_ctx.current_catalog().to_string(),
1071        flow_name: sanitize_flow_name(create_flow.flow_name)?,
1072        source_table_names,
1073        sink_table_name: Some(sink_table_name),
1074        or_replace: create_flow.or_replace,
1075        create_if_not_exists: create_flow.if_not_exists,
1076        expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
1077        eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
1078        comment: create_flow.comment.unwrap_or_default(),
1079        sql: create_flow.query.to_string(),
1080        flow_options: stringify_flow_options(create_flow.flow_options)?,
1081    })
1082}
1083
1084fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
1085    let options_len = flow_options.len();
1086    let flow_options = flow_options.into_map();
1087    ensure!(
1088        flow_options.len() == options_len,
1089        InvalidSqlSnafu {
1090            err_msg: "flow options only support scalar string-compatible values".to_string(),
1091        }
1092    );
1093    Ok(flow_options)
1094}
1095
1096/// sanitize the flow name, remove possible quotes
1097fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
1098    ensure!(
1099        flow_name.0.len() == 1,
1100        InvalidFlowNameSnafu {
1101            name: flow_name.to_string(),
1102        }
1103    );
1104    // safety: we've checked flow_name.0 has exactly one element.
1105    Ok(flow_name.0.swap_remove(0).to_string_unquoted())
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use std::collections::HashMap;
1111
1112    use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
1113    use datatypes::value::Value;
1114    use session::context::{QueryContext, QueryContextBuilder};
1115    use sql::dialect::GreptimeDbDialect;
1116    use sql::parser::{ParseOptions, ParserContext};
1117    use sql::statements::statement::Statement;
1118    use store_api::storage::ColumnDefaultConstraint;
1119
1120    use super::*;
1121
1122    #[test]
1123    fn test_create_flow_tql_expr() {
1124        let sql = r#"
1125CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1126TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
1127        let stmt =
1128            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
1129
1130        assert!(
1131            stmt.is_err(),
1132            "Expected error for invalid TQL EVAL parameters: {:#?}",
1133            stmt
1134        );
1135
1136        let sql = r#"
1137CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1138TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#;
1139        let stmt =
1140            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1141                .unwrap()
1142                .pop()
1143                .unwrap();
1144
1145        let Statement::CreateFlow(create_flow) = stmt else {
1146            unreachable!()
1147        };
1148        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1149
1150        let to_dot_sep =
1151            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1152        assert_eq!("calc_reqs", expr.flow_name);
1153        assert_eq!("greptime", expr.catalog_name);
1154        assert_eq!(
1155            "greptime.public.cnt_reqs",
1156            expr.sink_table_name.map(to_dot_sep).unwrap()
1157        );
1158        assert_eq!(1, expr.source_table_names.len());
1159        assert_eq!(
1160            "greptime.public.http_requests",
1161            to_dot_sep(expr.source_table_names[0].clone())
1162        );
1163        assert_eq!(
1164            r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#,
1165            expr.sql
1166        );
1167
1168        let sql = r#"
1169CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1170TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
1171        let stmt =
1172            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1173                .unwrap()
1174                .pop()
1175                .unwrap();
1176        let Statement::CreateFlow(create_flow) = stmt else {
1177            unreachable!()
1178        };
1179        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1180        assert_eq!(1, expr.source_table_names.len());
1181        assert_eq!(
1182            "greptime.greptime_private.http_requests",
1183            to_dot_sep(expr.source_table_names[0].clone())
1184        );
1185
1186        let sql = r#"
1187CREATE FLOW calc_reqs SINK TO cnt_reqs AS
1188TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#;
1189        let stmt =
1190            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1191                .unwrap()
1192                .pop()
1193                .unwrap();
1194        let Statement::CreateFlow(create_flow) = stmt else {
1195            unreachable!()
1196        };
1197        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1198        assert_eq!(1, expr.source_table_names.len());
1199        assert_eq!(
1200            "greptime.greptime_private.http_requests",
1201            to_dot_sep(expr.source_table_names[0].clone())
1202        );
1203    }
1204
1205    #[test]
1206    fn test_create_flow_tql_cte_source_tables() {
1207        let sql = r#"
1208CREATE FLOW calc_cte
1209SINK TO metric_cte_sink
1210EVAL INTERVAL '1m'
1211AS
1212WITH tql(ts, the_value) AS (
1213  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1214)
1215SELECT * FROM tql;
1216"#;
1217
1218        let stmt =
1219            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1220                .unwrap()
1221                .pop()
1222                .unwrap();
1223
1224        let Statement::CreateFlow(create_flow) = stmt else {
1225            unreachable!()
1226        };
1227        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1228
1229        let to_dot_sep =
1230            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1231        assert_eq!(1, expr.source_table_names.len());
1232        assert_eq!(
1233            "greptime.public.metric_cte",
1234            to_dot_sep(expr.source_table_names[0].clone())
1235        );
1236    }
1237
1238    #[test]
1239    fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
1240        let sql = r#"
1241CREATE FLOW calc_cte
1242SINK TO metric_cte_sink
1243EVAL INTERVAL '1m'
1244AS
1245WITH "TQL"(ts, the_value) AS (
1246  TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
1247)
1248SELECT * FROM "TQL";
1249"#;
1250
1251        let stmt =
1252            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1253                .unwrap()
1254                .pop()
1255                .unwrap();
1256
1257        let Statement::CreateFlow(create_flow) = stmt else {
1258            unreachable!()
1259        };
1260        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1261
1262        let to_dot_sep =
1263            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1264        assert_eq!(1, expr.source_table_names.len());
1265        assert_eq!(
1266            "greptime.public.metric_cte",
1267            to_dot_sep(expr.source_table_names[0].clone())
1268        );
1269    }
1270
1271    #[test]
1272    fn test_create_flow_tql_cte_source_tables_same_name() {
1273        let sql = r#"
1274CREATE FLOW calc_cte
1275SINK TO metric_cte_sink
1276EVAL INTERVAL '1m'
1277AS
1278WITH tql(ts, the_value) AS (
1279  TQL EVAL (now() - '1m'::interval, now(), '5s') tql
1280)
1281SELECT * FROM tql;
1282"#;
1283
1284        let stmt =
1285            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1286                .unwrap()
1287                .pop()
1288                .unwrap();
1289
1290        let Statement::CreateFlow(create_flow) = stmt else {
1291            unreachable!()
1292        };
1293        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1294
1295        let to_dot_sep =
1296            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1297        assert_eq!(1, expr.source_table_names.len());
1298        assert_eq!(
1299            "greptime.public.tql",
1300            to_dot_sep(expr.source_table_names[0].clone())
1301        );
1302    }
1303
1304    #[test]
1305    fn test_create_flow_expr() {
1306        let sql = r"
1307CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
1308SELECT
1309    DISTINCT number as dis
1310FROM
1311    distinct_basic;";
1312        let stmt =
1313            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1314                .unwrap()
1315                .pop()
1316                .unwrap();
1317
1318        let Statement::CreateFlow(create_flow) = stmt else {
1319            unreachable!()
1320        };
1321        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1322
1323        let to_dot_sep =
1324            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1325        assert_eq!("test_distinct_basic", expr.flow_name);
1326        assert_eq!("greptime", expr.catalog_name);
1327        assert_eq!(
1328            "greptime.public.out_distinct_basic",
1329            expr.sink_table_name.map(to_dot_sep).unwrap()
1330        );
1331        assert_eq!(1, expr.source_table_names.len());
1332        assert_eq!(
1333            "greptime.public.distinct_basic",
1334            to_dot_sep(expr.source_table_names[0].clone())
1335        );
1336        assert_eq!(
1337            r"SELECT
1338    DISTINCT number as dis
1339FROM
1340    distinct_basic",
1341            expr.sql
1342        );
1343
1344        let sql = r"
1345CREATE FLOW `task_2`
1346SINK TO schema_1.table_1
1347AS
1348SELECT max(c1), min(c2) FROM schema_2.table_2;";
1349        let stmt =
1350            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1351                .unwrap()
1352                .pop()
1353                .unwrap();
1354
1355        let Statement::CreateFlow(create_flow) = stmt else {
1356            unreachable!()
1357        };
1358        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1359
1360        let to_dot_sep =
1361            |c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
1362        assert_eq!("task_2", expr.flow_name);
1363        assert_eq!("greptime", expr.catalog_name);
1364        assert_eq!(
1365            "greptime.schema_1.table_1",
1366            expr.sink_table_name.map(to_dot_sep).unwrap()
1367        );
1368        assert_eq!(1, expr.source_table_names.len());
1369        assert_eq!(
1370            "greptime.schema_2.table_2",
1371            to_dot_sep(expr.source_table_names[0].clone())
1372        );
1373        assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
1374        assert!(expr.flow_options.is_empty());
1375
1376        let sql = r"
1377CREATE FLOW task_3
1378SINK TO schema_1.table_1
1379WITH (defer_on_missing_source = 'true', foo = 'bar')
1380AS
1381SELECT max(c1), min(c2) FROM schema_2.table_2;";
1382        let stmt =
1383            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1384                .unwrap()
1385                .pop()
1386                .unwrap();
1387
1388        let Statement::CreateFlow(create_flow) = stmt else {
1389            unreachable!()
1390        };
1391        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1392        assert_eq!(
1393            expr.flow_options,
1394            HashMap::from([
1395                ("defer_on_missing_source".to_string(), "true".to_string()),
1396                ("foo".to_string(), "bar".to_string()),
1397            ])
1398        );
1399
1400        let sql = r"
1401CREATE FLOW task_4
1402SINK TO schema_1.table_1
1403WITH (defer_on_missing_source = true)
1404AS
1405SELECT max(c1), min(c2) FROM schema_2.table_2;";
1406        let stmt =
1407            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1408                .unwrap()
1409                .pop()
1410                .unwrap();
1411
1412        let Statement::CreateFlow(create_flow) = stmt else {
1413            unreachable!()
1414        };
1415        let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
1416        assert_eq!(
1417            expr.flow_options,
1418            HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
1419        );
1420
1421        let sql = r"
1422CREATE FLOW task_5
1423SINK TO schema_1.table_1
1424WITH (defer_on_missing_source = [true])
1425AS
1426SELECT max(c1), min(c2) FROM schema_2.table_2;";
1427        let stmt =
1428            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1429                .unwrap()
1430                .pop()
1431                .unwrap();
1432
1433        let Statement::CreateFlow(create_flow) = stmt else {
1434            unreachable!()
1435        };
1436        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1437        assert!(res.is_err());
1438        assert!(
1439            res.unwrap_err()
1440                .to_string()
1441                .contains("flow options only support scalar string-compatible values")
1442        );
1443
1444        let sql = r"
1445CREATE FLOW abc.`task_2`
1446SINK TO schema_1.table_1
1447AS
1448SELECT max(c1), min(c2) FROM schema_2.table_2;";
1449        let stmt =
1450            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1451                .unwrap()
1452                .pop()
1453                .unwrap();
1454
1455        let Statement::CreateFlow(create_flow) = stmt else {
1456            unreachable!()
1457        };
1458        let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
1459
1460        assert!(res.is_err());
1461        assert!(
1462            res.unwrap_err()
1463                .to_string()
1464                .contains("Invalid flow name: abc.`task_2`")
1465        );
1466    }
1467
1468    #[test]
1469    fn test_create_to_expr() {
1470        let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";
1471        let stmt =
1472            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1473                .unwrap()
1474                .pop()
1475                .unwrap();
1476
1477        let Statement::CreateTable(create_table) = stmt else {
1478            unreachable!()
1479        };
1480        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1481        assert_eq!("3days", expr.table_options.get("ttl").unwrap());
1482        assert_eq!(
1483            "1.0MiB",
1484            expr.table_options.get("write_buffer_size").unwrap()
1485        );
1486    }
1487
1488    #[test]
1489    fn test_invalid_create_to_expr() {
1490        let cases = [
1491            // duplicate column declaration
1492            "CREATE TABLE monitor (host STRING primary key, ts TIMESTAMP TIME INDEX, some_column text, some_column string);",
1493            // duplicate primary key
1494            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, some_column STRING, PRIMARY KEY (some_column, host, some_column));",
1495            // time index is primary key
1496            "CREATE TABLE monitor (host STRING, ts TIMESTAMP TIME INDEX, PRIMARY KEY (host, ts));",
1497        ];
1498
1499        for sql in cases {
1500            let stmt = ParserContext::create_with_dialect(
1501                sql,
1502                &GreptimeDbDialect {},
1503                ParseOptions::default(),
1504            )
1505            .unwrap()
1506            .pop()
1507            .unwrap();
1508            let Statement::CreateTable(create_table) = stmt else {
1509                unreachable!()
1510            };
1511            create_to_expr(&create_table, &QueryContext::arc()).unwrap_err();
1512        }
1513    }
1514
1515    #[test]
1516    fn test_create_to_expr_with_default_timestamp_value() {
1517        let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
1518        let stmt =
1519            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1520                .unwrap()
1521                .pop()
1522                .unwrap();
1523
1524        let Statement::CreateTable(create_table) = stmt else {
1525            unreachable!()
1526        };
1527
1528        // query context with system timezone UTC.
1529        let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap();
1530        let ts_column = &expr.column_defs[1];
1531        let constraint = assert_ts_column(ts_column);
1532        assert!(
1533            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1534                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1535        );
1536
1537        // query context with timezone `+08:00`
1538        let ctx = QueryContextBuilder::default()
1539            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1540            .build()
1541            .into();
1542        let expr = create_to_expr(&create_table, &ctx).unwrap();
1543        let ts_column = &expr.column_defs[1];
1544        let constraint = assert_ts_column(ts_column);
1545        assert!(
1546            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1547                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1548        );
1549    }
1550
1551    fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
1552        assert_eq!("ts", ts_column.name);
1553        assert_eq!(
1554            ColumnDataType::TimestampMillisecond as i32,
1555            ts_column.data_type
1556        );
1557        assert!(!ts_column.default_constraint.is_empty());
1558
1559        ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
1560    }
1561
1562    #[test]
1563    fn test_to_alter_expr() {
1564        let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
1565        let stmt =
1566            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1567                .unwrap()
1568                .pop()
1569                .unwrap();
1570
1571        let Statement::AlterDatabase(alter_database) = stmt else {
1572            unreachable!()
1573        };
1574
1575        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1576        let kind = expr.kind.unwrap();
1577
1578        let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
1579            set_database_options,
1580        }) = kind
1581        else {
1582            unreachable!()
1583        };
1584
1585        assert_eq!(2, set_database_options.len());
1586        assert_eq!("key1", set_database_options[0].key);
1587        assert_eq!("value1", set_database_options[0].value);
1588        assert_eq!("key2", set_database_options[1].key);
1589        assert_eq!("value2", set_database_options[1].value);
1590
1591        let sql = "ALTER DATABASE greptime UNSET key1, key2;";
1592        let stmt =
1593            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1594                .unwrap()
1595                .pop()
1596                .unwrap();
1597
1598        let Statement::AlterDatabase(alter_database) = stmt else {
1599            unreachable!()
1600        };
1601
1602        let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
1603        let kind = expr.kind.unwrap();
1604
1605        let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
1606            unreachable!()
1607        };
1608
1609        assert_eq!(2, keys.len());
1610        assert!(keys.contains(&"key1".to_string()));
1611        assert!(keys.contains(&"key2".to_string()));
1612
1613        let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
1614        let stmt =
1615            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1616                .unwrap()
1617                .pop()
1618                .unwrap();
1619
1620        let Statement::AlterTable(alter_table) = stmt else {
1621            unreachable!()
1622        };
1623
1624        // query context with system timezone UTC.
1625        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1626        let kind = expr.kind.unwrap();
1627
1628        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1629            unreachable!()
1630        };
1631
1632        assert_eq!(1, add_columns.len());
1633        let ts_column = add_columns[0].column_def.clone().unwrap();
1634        let constraint = assert_ts_column(&ts_column);
1635        assert!(
1636            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1637                         if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
1638        );
1639
1640        //
1641        // query context with timezone `+08:00`
1642        let ctx = QueryContextBuilder::default()
1643            .timezone(Timezone::from_tz_string("+08:00").unwrap())
1644            .build()
1645            .into();
1646        let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
1647        let kind = expr.kind.unwrap();
1648
1649        let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
1650            unreachable!()
1651        };
1652
1653        assert_eq!(1, add_columns.len());
1654        let ts_column = add_columns[0].column_def.clone().unwrap();
1655        let constraint = assert_ts_column(&ts_column);
1656        assert!(
1657            matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
1658                         if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
1659        );
1660    }
1661
1662    #[test]
1663    fn test_to_alter_modify_column_type_expr() {
1664        let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
1665        let stmt =
1666            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1667                .unwrap()
1668                .pop()
1669                .unwrap();
1670
1671        let Statement::AlterTable(alter_table) = stmt else {
1672            unreachable!()
1673        };
1674
1675        // query context with system timezone UTC.
1676        let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
1677        let kind = expr.kind.unwrap();
1678
1679        let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
1680            modify_column_types,
1681        }) = kind
1682        else {
1683            unreachable!()
1684        };
1685
1686        assert_eq!(1, modify_column_types.len());
1687        let modify_column_type = &modify_column_types[0];
1688
1689        assert_eq!("mem_usage", modify_column_type.column_name);
1690        assert_eq!(
1691            ColumnDataType::String as i32,
1692            modify_column_type.target_type
1693        );
1694        assert!(modify_column_type.target_type_extension.is_none());
1695    }
1696
1697    #[test]
1698    fn test_to_repartition_request() {
1699        let sql = r#"
1700ALTER TABLE metrics REPARTITION (
1701  device_id < 100
1702) INTO (
1703  device_id < 100 AND area < 'South',
1704  device_id < 100 AND area >= 'South'
1705);"#;
1706        let stmt =
1707            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1708                .unwrap()
1709                .pop()
1710                .unwrap();
1711
1712        let Statement::AlterTable(alter_table) = stmt else {
1713            unreachable!()
1714        };
1715
1716        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1717        assert_eq!("greptime", request.catalog_name);
1718        assert_eq!("public", request.schema_name);
1719        assert_eq!("metrics", request.table_name);
1720        let RepartitionSource::Partitions { from_exprs } = request.source else {
1721            unreachable!()
1722        };
1723        assert_eq!(
1724            from_exprs
1725                .into_iter()
1726                .map(|x| x.to_string())
1727                .collect::<Vec<_>>(),
1728            vec!["device_id < 100".to_string()]
1729        );
1730        assert_eq!(
1731            request
1732                .into_exprs
1733                .into_iter()
1734                .map(|x| x.to_string())
1735                .collect::<Vec<_>>(),
1736            vec![
1737                "device_id < 100 AND area < 'South'".to_string(),
1738                "device_id < 100 AND area >= 'South'".to_string()
1739            ]
1740        );
1741    }
1742
1743    #[test]
1744    fn test_to_repartition_request_with_unpartitioned_source() {
1745        let sql = r#"
1746ALTER TABLE metrics PARTITION ON COLUMNS (device_id, area) (
1747  device_id < 100 AND area < 'South',
1748  device_id < 100 AND area >= 'South'
1749);"#;
1750        let stmt =
1751            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1752                .unwrap()
1753                .pop()
1754                .unwrap();
1755
1756        let Statement::AlterTable(alter_table) = stmt else {
1757            unreachable!()
1758        };
1759
1760        let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
1761        assert_eq!("greptime", request.catalog_name);
1762        assert_eq!("public", request.schema_name);
1763        assert_eq!("metrics", request.table_name);
1764        let RepartitionSource::Unpartitioned { partition_columns } = request.source else {
1765            unreachable!()
1766        };
1767        assert_eq!(partition_columns, vec!["device_id", "area"]);
1768        assert_eq!(
1769            request
1770                .into_exprs
1771                .into_iter()
1772                .map(|x| x.to_string())
1773                .collect::<Vec<_>>(),
1774            vec![
1775                "device_id < 100 AND area < 'South'".to_string(),
1776                "device_id < 100 AND area >= 'South'".to_string()
1777            ]
1778        );
1779    }
1780
1781    fn new_test_table_names() -> Vec<TableName> {
1782        vec![
1783            TableName {
1784                catalog_name: "greptime".to_string(),
1785                schema_name: "public".to_string(),
1786                table_name: "a_table".to_string(),
1787            },
1788            TableName {
1789                catalog_name: "greptime".to_string(),
1790                schema_name: "public".to_string(),
1791                table_name: "b_table".to_string(),
1792            },
1793        ]
1794    }
1795
1796    #[test]
1797    fn test_to_create_view_expr() {
1798        let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
1799        let stmt =
1800            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1801                .unwrap()
1802                .pop()
1803                .unwrap();
1804
1805        let Statement::CreateView(stmt) = stmt else {
1806            unreachable!()
1807        };
1808
1809        let logical_plan = vec![1, 2, 3];
1810        let table_names = new_test_table_names();
1811        let columns = vec!["a".to_string()];
1812        let plan_columns = vec!["number".to_string()];
1813
1814        let expr = to_create_view_expr(
1815            stmt,
1816            logical_plan.clone(),
1817            table_names.clone(),
1818            columns.clone(),
1819            plan_columns.clone(),
1820            sql.to_string(),
1821            QueryContext::arc(),
1822        )
1823        .unwrap();
1824
1825        assert_eq!("greptime", expr.catalog_name);
1826        assert_eq!("public", expr.schema_name);
1827        assert_eq!("test", expr.view_name);
1828        assert!(!expr.create_if_not_exists);
1829        assert!(!expr.or_replace);
1830        assert_eq!(logical_plan, expr.logical_plan);
1831        assert_eq!(table_names, expr.table_names);
1832        assert_eq!(sql, expr.definition);
1833        assert_eq!(columns, expr.columns);
1834        assert_eq!(plan_columns, expr.plan_columns);
1835    }
1836
1837    #[test]
1838    fn test_to_create_view_expr_complex() {
1839        let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
1840        let stmt =
1841            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1842                .unwrap()
1843                .pop()
1844                .unwrap();
1845
1846        let Statement::CreateView(stmt) = stmt else {
1847            unreachable!()
1848        };
1849
1850        let logical_plan = vec![1, 2, 3];
1851        let table_names = new_test_table_names();
1852        let columns = vec!["a".to_string()];
1853        let plan_columns = vec!["number".to_string()];
1854
1855        let expr = to_create_view_expr(
1856            stmt,
1857            logical_plan.clone(),
1858            table_names.clone(),
1859            columns.clone(),
1860            plan_columns.clone(),
1861            sql.to_string(),
1862            QueryContext::arc(),
1863        )
1864        .unwrap();
1865
1866        assert_eq!("greptime", expr.catalog_name);
1867        assert_eq!("test", expr.schema_name);
1868        assert_eq!("test_view", expr.view_name);
1869        assert!(expr.create_if_not_exists);
1870        assert!(expr.or_replace);
1871        assert_eq!(logical_plan, expr.logical_plan);
1872        assert_eq!(table_names, expr.table_names);
1873        assert_eq!(sql, expr.definition);
1874        assert_eq!(columns, expr.columns);
1875        assert_eq!(plan_columns, expr.plan_columns);
1876    }
1877
1878    #[test]
1879    fn test_expr_to_create() {
1880        let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
1881  `timestamp` TIMESTAMP(9) NOT NULL,
1882  `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
1883  `username` STRING NULL,
1884  `http_method` STRING NULL INVERTED INDEX,
1885  `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
1886  `protocol` STRING NULL,
1887  `status_code` INT NULL INVERTED INDEX,
1888  `response_size` BIGINT NULL,
1889  `message` STRING NULL,
1890  TIME INDEX (`timestamp`),
1891  PRIMARY KEY (`username`, `status_code`)
1892)
1893ENGINE=mito
1894WITH(
1895  append_mode = 'true'
1896)"#;
1897        let stmt =
1898            ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
1899                .unwrap()
1900                .pop()
1901                .unwrap();
1902
1903        let Statement::CreateTable(original_create) = stmt else {
1904            unreachable!()
1905        };
1906
1907        // Convert CreateTable -> CreateTableExpr -> CreateTable
1908        let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
1909
1910        let create_table = expr_to_create(&expr, Some('`')).unwrap();
1911        let new_sql = format!("{:#}", create_table);
1912        assert_eq!(sql, new_sql);
1913    }
1914}